Merge branch 'dev' into zhy_feat_cev_v8

This commit is contained in:
zenghongyao 2025-05-11 10:43:54 +08:00
commit acd0a5c155
12 changed files with 420 additions and 176 deletions

View File

@ -226,7 +226,7 @@ namespace JiShe.CollectBus.IncrementalGenerator
code.AppendLine($" public string EntityName {{get;}} = \"{classSymbol.Name}\";"); code.AppendLine($" public string EntityName {{get;}} = \"{classSymbol.Name}\";");
// 添加 EntityType 属性 // 添加 EntityType 属性
code.AppendLine($" public EntityTypeEnum? EntityType {{ get; }} = {entityTypeValue};"); code.AppendLine($" public EntityTypeEnum? EntityType {{ get; }} = {entityTypeValue};");
foreach (var prop in propList) foreach (var prop in propList)
{ {
// 安全类型转换 // 安全类型转换
@ -573,7 +573,7 @@ namespace JiShe.CollectBus.IncrementalGenerator
$"new EntityMemberInfo(" + $"new EntityMemberInfo(" +
$"\"{prop.Name}.{elementName}\", " + $"\"{prop.Name}.{elementName}\", " +
$"typeof({elementType}), " + $"typeof({elementType}), " +
$"\"{elementDeclaredName}\", " + $"typeof({elementType}).Name, " +//$"\"{elementDeclaredName}\", " +
$"(e) => Get{prop.Name}_{elementName}(({entityType})e), " + $"(e) => Get{prop.Name}_{elementName}(({entityType})e), " +
$"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))"); $"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))");
} }

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDB.Exceptions
{
/// <summary>
/// IoTDB异常
/// </summary>
public class IoTException : Exception
{
public int ErrorCode { get; }
public IoTException(string message, int errorCode)
: base($"{message} (Code: {errorCode})")
{
ErrorCode = errorCode;
}
}
}

View File

@ -14,7 +14,6 @@
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Analyzers.Shared\JiShe.CollectBus.Analyzers.Shared.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Analyzers.Shared\JiShe.CollectBus.Analyzers.Shared.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\..\modules\JiShe.CollectBus.Analyzers\JiShe.CollectBus.Analyzers.csproj" <ProjectReference Include="..\..\modules\JiShe.CollectBus.Analyzers\JiShe.CollectBus.Analyzers.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
OutputItemType="Analyzer" ReferenceOutputAssembly="false"/>
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,6 +1,7 @@
using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.IoTDB.Attributes; using JiShe.CollectBus.IoTDB.Attributes;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IoTDB.Model namespace JiShe.CollectBus.IoTDB.Model
{ {
@ -43,5 +44,27 @@ namespace JiShe.CollectBus.IoTDB.Model
/// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取
/// </summary> /// </summary>
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
/// <summary>
/// 设备路径
/// </summary>
public virtual string DevicePath
{
get
{
return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`";
}
set
{
if (string.IsNullOrWhiteSpace(value))
{
DevicePath = $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`";
}
else
{
DevicePath = value;
}
}
}
} }
} }

View File

@ -6,8 +6,18 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <summary> /// <summary>
/// 设备元数据 /// 设备元数据
/// </summary> /// </summary>
public class DeviceMetadata public sealed class DeviceMetadata
{ {
/// <summary>
/// 实体类名称
/// </summary>
public string EntityName { get; set; }
/// <summary>
/// 设备表名或树路径如果实体没有添加TableNameOrTreePath,此处为空
/// </summary>
public string TableNameOrTreePath { get; set; }
/// <summary> /// <summary>
/// 实体类型枚举 /// 实体类型枚举
/// </summary> /// </summary>
@ -31,6 +41,42 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <summary> /// <summary>
/// 值类型集合用于构建Table的值类型也就是dataTypes参数 /// 值类型集合用于构建Table的值类型也就是dataTypes参数
/// </summary> /// </summary>
public List<TSDataType> DataTypes { get; } = new(); public List<TSDataType> DataTypes { get; set; } = new();
/// <summary>
/// 列处理信息集合
/// </summary>
public List<ColumnProcessor> Processors { get; } = new List<ColumnProcessor>();
}
/// <summary>
/// 列处理信息结构
/// </summary>
public struct ColumnProcessor
{
/// <summary>
/// 列名
/// </summary>
public string ColumnName;
/// <summary>
/// 值获取委托
/// </summary>
public Func<object, object> ValueGetter;
/// <summary>
/// 类型转换委托
/// </summary>
public Func<object, object> Converter;
/// <summary>
/// 是否单测点
/// </summary>
public bool IsSingleMeasuring;
/// <summary>
/// 单测点名称委托
/// </summary>
public Func<object, object> SingleMeasuringNameGetter;
} }
} }

View File

@ -21,6 +21,11 @@ using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Entities;
using JiShe.CollectBus.Analyzers.Shared; using JiShe.CollectBus.Analyzers.Shared;
using JiShe.CollectBus.IoTDB.Exceptions;
using System.Diagnostics.Metrics;
using Newtonsoft.Json.Linq;
using static System.Runtime.InteropServices.JavaScript.JSType;
using System.Text.RegularExpressions;
namespace JiShe.CollectBus.IoTDB.Provider namespace JiShe.CollectBus.IoTDB.Provider
{ {
@ -67,8 +72,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
var metadata = await GetMetadata<T>(); var metadata = await GetMetadata<T>();
var tablet = BuildTablet(new[] { entity }, metadata); var tablet = BuildTablet(new[] { entity }, metadata);
if (tablet == null || tablet.Count <= 0)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
await CurrentSession.InsertAsync(tablet); await CurrentSession.InsertAsync(tablet.First());
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -94,7 +104,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
foreach (var batch in batches) foreach (var batch in batches)
{ {
var tablet = BuildTablet(batch, metadata); var tablet = BuildTablet(batch, metadata);
await CurrentSession.InsertAsync(tablet); if (tablet == null || tablet.Count <= 0)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
foreach (var item in tablet)
{
await CurrentSession.InsertAsync(item);
}
} }
} }
catch (Exception ex) catch (Exception ex)
@ -122,7 +140,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
foreach (var batch in batches) foreach (var batch in batches)
{ {
var tablet = BuildTablet(batch, deviceMetadata); var tablet = BuildTablet(batch, deviceMetadata);
await CurrentSession.InsertAsync(tablet); if (tablet == null || tablet.Count <= 0)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
foreach (var item in tablet)
{
await CurrentSession.InsertAsync(item);
}
} }
} }
catch (Exception ex) catch (Exception ex)
@ -180,22 +206,54 @@ namespace JiShe.CollectBus.IoTDB.Provider
var accessor = SourceEntityAccessorFactory.GetAccessor<T>(); var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var columns = CollectColumnMetadata<T>(accessor); var columns = CollectColumnMetadata<T>(accessor);
var metadata = BuildDeviceMetadata<T>(columns); var tmpMetadata = BuildDeviceMetadata<T>(columns, accessor);
string tableNameOrTreePath = string.Empty;
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
if (tableNameOrTreePathAttribute != null)
{
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
}
tmpMetadata.EntityName = accessor.EntityName;
tmpMetadata.EntityType = accessor.EntityType;
tmpMetadata.TableNameOrTreePath = tableNameOrTreePath;
var metaData = MetadataCache.AddOrUpdate( var metaData = MetadataCache.AddOrUpdate(
typeof(T), typeof(T),
addValueFactory: t => metadata, // 如果键不存在,用此值添加 addValueFactory: t => tmpMetadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) => updateValueFactory: (t, existingValue) =>
{ {
var columns = CollectColumnMetadata(accessor); var columns = CollectColumnMetadata(accessor);
var metadata = BuildDeviceMetadata<T>(columns); var metadata = BuildDeviceMetadata(columns, accessor);
//对现有值 existingValue 进行修改,返回新值 //对现有值 existingValue 进行修改,返回新值
string tableNameOrTreePath = string.Empty;
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
if (tableNameOrTreePathAttribute != null)
{
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
}
existingValue.ColumnNames = metadata.ColumnNames; existingValue.ColumnNames = metadata.ColumnNames;
existingValue.DataTypes = metadata.DataTypes;
return existingValue; return existingValue;
} }
); );
metadata.EntityType = accessor.EntityType; //var metaData = MetadataCache.GetOrAdd(typeof(T), type =>
//{
// var columns = CollectColumnMetadata(accessor);
// var metadata = BuildDeviceMetadata(columns, accessor);
// string tableNameOrTreePath = string.Empty;
// var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
// if (tableNameOrTreePathAttribute != null)
// {
// tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
// }
// metadata.EntityName = accessor.EntityName;
// metadata.EntityType = accessor.EntityType;
// metadata.TableNameOrTreePath = tableNameOrTreePath;
// return metadata;
//});
return await Task.FromResult(metaData); return await Task.FromResult(metaData);
} }
@ -257,31 +315,26 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <param name="entities">表实体</param> /// <param name="entities">表实体</param>
/// <param name="metadata">设备元数据</param></param> /// <param name="metadata">设备元数据</param></param>
/// <returns></returns> /// <returns></returns>
private Tablet BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity private List<Tablet> BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity
{ {
var timestamps = new List<long>(); var entitiyList = entities.ToList();
var values = new List<List<object>>(); if (entitiyList == null || entitiyList.Count <= 0)
var devicePaths = new HashSet<string>();
List<string> tempColumnNames = new List<string>();
tempColumnNames.AddRange(metadata.ColumnNames);
var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var memberCache = new Dictionary<string, EntityMemberInfo>(); // 缓存优化查询
// 预构建成员缓存Key: NameOrPath
foreach (var member in accessor.MemberList)
{ {
memberCache[member.NameOrPath] = member; return null;
} }
if (accessor.EntityType == null || metadata.EntityType == null) //var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
//var memberCache = BuildMemberCache(accessor);
if (metadata.EntityType == null)
{ {
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101");
} }
if (metadata.EntityType != accessor.EntityType) if (metadata.EntityType == EntityTypeEnum.Other)
{ {
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 不属于IoTDB数据模型实体,属于异常情况,-102");
} }
if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true)
@ -292,103 +345,90 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接属于异常情况-104"); throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接属于异常情况-104");
} }
string tableNameOrTreePath = string.Empty; string tableNameOrTreePath = string.Empty;
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>(); if (_runtimeContext.UseTableSessionPool)//表模型
if (tableNameOrTreePathAttribute != null)
{ {
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
}
foreach (var entity in entities)
{
timestamps.Add(entity.Timestamps);
var rowValues = new List<object>();
foreach (var measurement in tempColumnNames)
{
if (!memberCache.TryGetValue(measurement, out var member))
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName}没有找到{measurement}对应的member信息-105");
}
var value = member.GetValue(entity);
// 特性查询优化
var attributes = member.CustomAttributes ?? Enumerable.Empty<Attribute>();
var singleMeasuringAttr = attributes.OfType<SingleMeasuringAttribute>().FirstOrDefault();
if (singleMeasuringAttr != null)//如果是单侧点
{
var tupleItemKey = $"{member.NameOrPath}.Item2";
if (!memberCache.TryGetValue(tupleItemKey, out var tupleMember))
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item2 信息,-106");
}
value = tupleMember.GetValue(entity);
}
if (value != null)
{
var tempValue = member.DeclaredTypeName.ToUpper() switch
{
"DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
_ => value
};
rowValues.Add(tempValue);
}
else
{
rowValues.Add(value);
}
}
values.Add(rowValues);
//如果指定了路径 //如果指定了路径
if (!string.IsNullOrWhiteSpace(tableNameOrTreePath)) if (!string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath))
{ {
devicePaths.Add(tableNameOrTreePath); tableNameOrTreePath = metadata.TableNameOrTreePath;
} }
else else
{ {
if (!_runtimeContext.UseTableSessionPool)//树模型 tableNameOrTreePath = DevicePathBuilder.GetTableName<T>();
}
return new List<Tablet>() { BuildTablet(entitiyList, metadata, tableNameOrTreePath) };
}
else
{
//树模型的时候实体的设备Id可能会不同因此需要根据不同路径进行存储。
var tabletList = new List<Tablet>();
var groupEntities = entitiyList.GroupBy(d => d.DevicePath).ToList();
foreach (var group in groupEntities)
{
tabletList.Add(BuildTablet(group.ToList(), metadata, group.Key));
}
return tabletList;
}
}
private Tablet BuildTablet<T>(List<T> entities, DeviceMetadata metadata, string tableNameOrTreePath) where T : IoTEntity
{
// 预分配内存结构
var rowCount = entities.Count;
var timestamps = new long[rowCount];
var values = new object[rowCount][];
for (var i = 0; i < values.Length; i++)
{
values[i] = new object[metadata.ColumnNames.Count];
}
List<string> tempColumnNames = new List<string>();
tempColumnNames.AddRange(metadata.ColumnNames);
// 顺序处理数据(保证线程安全)
for (var row = 0; row < rowCount; row++)
{
var entity = entities[row];
timestamps[row] = entity.Timestamps;
for (int i = 0; i < metadata.ColumnNames.Count; i++)
{
var processor = metadata.Processors[i];
if (processor.IsSingleMeasuring)
{ {
devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); tempColumnNames[i] = (string)processor.SingleMeasuringNameGetter(entity);
}
else
{
devicePaths.Add(DevicePathBuilder.GetTableName<T>());
} }
// 获取并转换值
values[row][i] = processor.ValueGetter(entity);
} }
} }
if (devicePaths.Count > 1)
{
throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。");
}
return _runtimeContext.UseTableSessionPool return _runtimeContext.UseTableSessionPool
? BuildTableSessionTablet(metadata, devicePaths.First(), values, timestamps) ? BuildTableSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList())
: BuildSessionTablet(metadata, devicePaths.First(), values, timestamps); : BuildSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList());
} }
/// <summary> /// <summary>
/// 构建tree模型的Tablet /// 构建tree模型的Tablet
/// </summary> /// </summary>
/// <param name="metadata">已解析的设备数据元数据</param> /// <param name="metadata">已解析的设备数据元数据</param>
/// <param name="devicePath">设备路径</param> /// <param name="devicePath">设备路径</param>
/// <param name="columns">数据列集合</param>
/// <param name="values">数据集合</param> /// <param name="values">数据集合</param>
/// <param name="timestamps">时间戳集合</param> /// <param name="timestamps">时间戳集合</param>
/// <returns></returns> /// <returns></returns>
private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List<List<object>> values, List<long> timestamps) private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List<string> columns, List<List<object>> values, List<long> timestamps)
{ {
//todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段只需要保留FIELD类型字段即可 //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段只需要保留FIELD类型字段即可
return new Tablet( return new Tablet(
devicePath, devicePath,
metadata.ColumnNames, columns,
metadata.DataTypes, metadata.DataTypes,
values, values,
timestamps timestamps
@ -400,14 +440,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// </summary> /// </summary>
/// <param name="metadata">已解析的设备数据元数据</param> /// <param name="metadata">已解析的设备数据元数据</param>
/// <param name="tableName">表名称</param> /// <param name="tableName">表名称</param>
/// <param name="columns">数据列集合</param>
/// <param name="values">数据集合</param> /// <param name="values">数据集合</param>
/// <param name="timestamps">时间戳集合</param> /// <param name="timestamps">时间戳集合</param>
/// <returns></returns> /// <returns></returns>
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List<List<object>> values, List<long> timestamps) private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List<string> columns, List<List<object>> values, List<long> timestamps)
{ {
var tablet = new Tablet( var tablet = new Tablet(
tableName, tableName,
metadata.ColumnNames, columns,
metadata.ColumnCategories, metadata.ColumnCategories,
metadata.DataTypes, metadata.DataTypes,
values, values,
@ -535,13 +576,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
var metadata = await GetMetadata<T>(); var metadata = await GetMetadata<T>();
var accessor = SourceEntityAccessorFactory.GetAccessor<T>(); var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var memberCache = new Dictionary<string, EntityMemberInfo>(); // 缓存优化查询 var memberCache = BuildMemberCache(accessor);
// 预构建成员缓存Key: NameOrPath
foreach (var member in accessor.MemberList)
{
memberCache[member.NameOrPath] = member;
}
var columns = new List<string>() { "Timestamps" }; var columns = new List<string>() { "Timestamps" };
var dataTypes = new List<TSDataType>() { TSDataType.TIMESTAMP }; var dataTypes = new List<TSDataType>() { TSDataType.TIMESTAMP };
@ -596,13 +631,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
private List<ColumnInfo> CollectColumnMetadata<T>(ISourceEntityAccessor<T> accessor) private List<ColumnInfo> CollectColumnMetadata<T>(ISourceEntityAccessor<T> accessor)
{ {
var columns = new List<ColumnInfo>(); var columns = new List<ColumnInfo>();
var memberCache = new Dictionary<string, EntityMemberInfo>(); // 缓存优化查询 var memberCache = BuildMemberCache(accessor);
// 预构建成员缓存Key: NameOrPath
foreach (var member in accessor.MemberList)
{
memberCache[member.NameOrPath] = member;
}
foreach (var member in accessor.MemberList) foreach (var member in accessor.MemberList)
{ {
@ -625,15 +654,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
ColumnInfo? column = null; ColumnInfo? column = null;
if (tagAttr != null) if (tagAttr != null)
{ {
column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false); column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName);
} }
else if (attrColumn != null) else if (attrColumn != null)
{ {
column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false); column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName);
} }
else if (fieldColumn != null) else if (fieldColumn != null)
{ {
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false); column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName);
} }
// 单测模式处理 // 单测模式处理
@ -644,7 +673,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
throw new Exception($"{nameof(CollectColumnMetadata)} {accessor.EntityName} {member.NameOrPath} 单侧点属性解析异常"); throw new Exception($"{nameof(CollectColumnMetadata)} {accessor.EntityName} {member.NameOrPath} 单侧点属性解析异常");
} }
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true); column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true, tupleMember.DeclaredTypeName);
} }
if (column.HasValue) columns.Add(column.Value); if (column.HasValue) columns.Add(column.Value);
@ -658,7 +687,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <param name="typeInfo">待解析的类</param> /// <param name="typeInfo">待解析的类</param>
/// <param name="columns">已处理好的数据列</param> /// <param name="columns">已处理好的数据列</param>
/// <returns></returns> /// <returns></returns>
private DeviceMetadata BuildDeviceMetadata<T>(List<ColumnInfo> columns) where T : IoTEntity private DeviceMetadata BuildDeviceMetadata<T>(List<ColumnInfo> columns, ISourceEntityAccessor<T> accessor) where T : IoTEntity
{ {
var metadata = new DeviceMetadata(); var metadata = new DeviceMetadata();
@ -677,9 +706,97 @@ namespace JiShe.CollectBus.IoTDB.Provider
ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata);
ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata);
// 新增处理器初始化
foreach (var item in metadata.ColumnNames)
{
ColumnInfo column = columns.FirstOrDefault(d => d.Name == item);
var processor = new ColumnProcessor
{
ColumnName = column.Name,
IsSingleMeasuring = column.IsSingleMeasuring,
Converter = CreateConverter(column.DeclaredTypeName.ToUpper())
};
// 处理单测点
if (column.IsSingleMeasuring)
{
var item1Member = accessor.MemberList
.First(m => m.NameOrPath == $"{column.Name}.Item1");
processor.SingleMeasuringNameGetter = (obj) =>
{
// 获取原始值并转为字符串
object rawValue = item1Member.Getter(obj);
string value = rawValue?.ToString();
if (!string.IsNullOrEmpty(value))
{
// 规则1: 严格检查ASCII字母和数字0-9, A-Z, a-z
bool hasInvalidChars = value.Any(c =>
!((c >= 'A' && c <= 'Z') ||
(c >= 'a' && c <= 'z') ||
(c >= '0' && c <= '9')));
// 规则2: 首字符不能是数字
bool startsWithDigit = value[0] >= '0' && value[0] <= '9';
// 规则3: 全字符串不能都是数字
bool allDigits = value.All(c => c >= '0' && c <= '9');
// 按优先级抛出具体异常
if (hasInvalidChars)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字");
}
else if (startsWithDigit)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能以数字开头");
}
else if (allDigits)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能全为数字");
}
}
return value;
};
var item2Member = accessor.MemberList
.First(m => m.NameOrPath == $"{column.Name}.Item2");
processor.ValueGetter = (obj) => {
object rawValue = item2Member.Getter(obj);
return processor.Converter(rawValue);
};
}
else
{
// 获取对应的成员访问器
var member = accessor.MemberList.First(m => m.NameOrPath == column.Name);
processor.ValueGetter = (obj) => {
object rawValue = member.Getter(obj);
return processor.Converter(rawValue);
};
}
metadata.Processors.Add(processor);
}
return metadata; return metadata;
} }
private Func<object, object> CreateConverter(string declaredTypeName)
{
return declaredTypeName switch
{
"DATETIME" => value => ((DateTime)value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
_ => value => value
};
}
/// <summary> /// <summary>
/// 处理不同列类型的逻辑 /// 处理不同列类型的逻辑
/// </summary> /// </summary>
@ -706,6 +823,11 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// </summary> /// </summary>
public string Name { get; } public string Name { get; }
/// <summary>
/// 声明的类型的名称
/// </summary>
public string DeclaredTypeName { get; }
/// <summary> /// <summary>
/// 是否是单测点 /// 是否是单测点
/// </summary> /// </summary>
@ -721,12 +843,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// </summary> /// </summary>
public TSDataType DataType { get; } public TSDataType DataType { get; }
public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring) public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring, string declaredTypeName)
{ {
Name = name; Name = name;
Category = category; Category = category;
DataType = dataType; DataType = dataType;
IsSingleMeasuring = isSingleMeasuring; IsSingleMeasuring = isSingleMeasuring;
DeclaredTypeName = declaredTypeName;
} }
} }
@ -807,5 +930,23 @@ namespace JiShe.CollectBus.IoTDB.Provider
TSDataType.STRING => Convert.ToString(value), TSDataType.STRING => Convert.ToString(value),
_ => Convert.ToString(value) _ => Convert.ToString(value)
}; };
/// <summary>
/// 缓存实体属性信息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="accessor"></param>
/// <returns></returns>
private Dictionary<string, EntityMemberInfo> BuildMemberCache<T>(ISourceEntityAccessor<T> accessor)
{
var cache = new Dictionary<string, EntityMemberInfo>(StringComparer.Ordinal);
foreach (var member in accessor.MemberList)
{
cache[member.NameOrPath] = member;
}
return cache;
}
private static readonly Regex _asciiAlphanumericRegex = new Regex(@"^[a-zA-Z0-9]*$", RegexOptions.Compiled);
} }
} }

View File

@ -140,13 +140,13 @@ namespace JiShe.CollectBus.DataChannels
// 批量写入数据库 // 批量写入数据库
await _dbProvider.BatchInsertAsync(metadata, records); await _dbProvider.BatchInsertAsync(metadata, records);
//// 限流推送Kafka // 限流推送Kafka
//await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
// items: records, items: records,
// deviceIdSelector: data => data.DeviceId, deviceIdSelector: data => data.DeviceId,
// processor: async (data, groupIndex) => processor: async (data, groupIndex) =>
// await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) await KafkaProducerIssuedMessageAction(topicName, data, groupIndex)
//); );
} }
catch (Exception ex) catch (Exception ex)
{ {

View File

@ -184,14 +184,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
time = DateTime.Now; time = DateTime.Now;
//System.Reflection.PropertyInfo; //System.Reflection.PropertyInfo;
//System.Reflection.FieldInfo //System.Reflection.FieldInfo
var meter = new TreeModelSingleMeasuringEntity<string>() var meter = new TreeModelSingleMeasuringEntity<DateTime>()
{ {
SystemName = "energy", SystemName = "energy",
DeviceId = "402440506", DeviceId = "402440506",
DeviceType = "1", DeviceType = "1",
ProjectId = "10059", ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = (measuring, value) SingleMeasuring = (measuring, time)
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }

View File

@ -7,8 +7,9 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
@ -17,8 +18,8 @@ using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Models;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System; using System;
@ -26,14 +27,6 @@ using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.Protocol.Models;
using System.Threading.Channels;
using static IdentityModel.ClaimComparer;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
using static System.Runtime.InteropServices.JavaScript.JSType;
using static System.Formats.Asn1.AsnWriter;
using System.Threading;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -329,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
return; //return;
// 创建取消令牌源 // 创建取消令牌源
//var cts = new CancellationTokenSource(); //var cts = new CancellationTokenSource();
@ -744,7 +737,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ItemCode = tempItem, ItemCode = tempItem,
DataTimeMark = new Protocol.DataTimeMark() DataTimeMark = new Protocol.DataTimeMark()
{ {
Density = ammeterInfo.TimeDensity,//todo 转换成协议的值 Density = ammeterInfo.TimeDensity.GetDensity(),//转换成协议的值
Point = 1, Point = 1,
DataTime = timestamps, DataTime = timestamps,
} }
@ -1354,7 +1347,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>(); List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
var itemCode = T37612012PacketItemCodeConst.AFN09HFN01H; var itemCode = T37612012PacketItemCodeConst.AFN09HFN01H;
//var subItemCode = T6452007PacketItemCodeConst.C08; //var subItemCode = T6452007PacketItemCodeConst.C08;
@ -1643,37 +1635,44 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType) protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType)
{ {
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); try
return new MeterReadingTelemetryPacketInfo()
{ {
SystemName = SystemType, string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
ProjectId = $"{ammeterInfo.ProjectID}", return new MeterReadingTelemetryPacketInfo()
DeviceType = $"{MeterTypeEnum.Ammeter}", {
DeviceId = $"{ammeterInfo.MeterId}", SystemName = SystemType,
Timestamps = timestamps, ProjectId = $"{ammeterInfo.ProjectID}",
DatabaseBusiID = ammeterInfo.DatabaseBusiID, DeviceType = $"{MeterTypeEnum.Ammeter}",
PendingCopyReadTime = pendingCopyReadTime, DeviceId = $"{ammeterInfo.MeterId}",
CreationTime = creationTime, Timestamps = timestamps,
MeterAddress = ammeterInfo.AmmerterAddress, DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PacketType = (int)packetType, PendingCopyReadTime = pendingCopyReadTime,
AFN = builderResponse.AFn, CreationTime = creationTime,
Fn = builderResponse.Fn, MeterAddress = ammeterInfo.AmmerterAddress,
Seq = builderResponse.Seq, PacketType = (int)packetType,
MSA = builderResponse.MSA, AFN = builderResponse.AFn,
FocusId = ammeterInfo.FocusId, Fn = builderResponse.Fn,
FocusAddress = ammeterInfo.FocusAddress, Seq = builderResponse.Seq,
ItemCode = itemCode, MSA = builderResponse.MSA,
SubItemCode = subItemCode, FocusId = ammeterInfo.FocusId,
TaskMark = taskMark, FocusAddress = ammeterInfo.FocusAddress,
IsSend = false, ItemCode = itemCode,
ManualOrNot = false, SubItemCode = subItemCode,
Pn = ammeterInfo.MeteringCode, TaskMark = taskMark,
IssuedMessageId = GuidGenerator.Create().ToString(), IsSend = false,
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), ManualOrNot = false,
IsReceived = false, Pn = ammeterInfo.MeteringCode,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), IssuedMessageId = Guid.NewGuid().ToString(),
}; IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
} IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
}
catch (Exception ex)
{
throw ex;
}
}
#endregion #endregion
} }

View File

@ -859,6 +859,21 @@ namespace JiShe.CollectBus.Common.Helpers
} }
/// <summary>
/// 采集频率转换为集中器采集密度
/// </summary>
/// <param name="timeDensity"></param>
/// <returns></returns>
public static int GetDensity(this int timeDensity) =>
timeDensity switch
{
0 => 0,//无
1 => 255,//1分钟
5 => 245,//5分钟
15 => 1,//15分钟
30 => 2,//30分钟
60 => 3,//60分钟
_ => -1//采集项本身无密度位
};
} }
} }

View File

@ -15,8 +15,7 @@
<meta http-equiv="X-UA-Compatible" content="IE=edge" /> <meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet" /> <link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet" />
<title>后端服务</title> <title>后端服务</title>
</head> </head>
<body> <body>

View File

@ -141,7 +141,7 @@
} }
}, },
"ServerApplicationOptions": { "ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus99", "ServerTagName": "JiSheCollectBus4",
"SystemType": "Energy", "SystemType": "Energy",
"FirstCollectionTime": "2025-04-28 15:07:00", "FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00", "AutomaticVerificationTime": "16:07:00",