diff --git a/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs b/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs index bad9528..ecb9c78 100644 --- a/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs +++ b/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs @@ -226,7 +226,7 @@ namespace JiShe.CollectBus.IncrementalGenerator code.AppendLine($" public string EntityName {{get;}} = \"{classSymbol.Name}\";"); // 添加 EntityType 属性 code.AppendLine($" public EntityTypeEnum? EntityType {{ get; }} = {entityTypeValue};"); - + foreach (var prop in propList) { // 安全类型转换 @@ -573,7 +573,7 @@ namespace JiShe.CollectBus.IncrementalGenerator $"new EntityMemberInfo(" + $"\"{prop.Name}.{elementName}\", " + $"typeof({elementType}), " + - $"\"{elementDeclaredName}\", " + + $"typeof({elementType}).Name, " +//$"\"{elementDeclaredName}\", " + $"(e) => Get{prop.Name}_{elementName}(({entityType})e), " + $"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))"); } diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index f2d55a5..9df2488 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -1,6 +1,7 @@ using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.IoTDB.Attributes; +using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IoTDB.Model { @@ -43,5 +44,27 @@ namespace JiShe.CollectBus.IoTDB.Model /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + + /// + /// 设备路径 + /// + public virtual string DevicePath + { + get + { + return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; + } + set + { + if (string.IsNullOrWhiteSpace(value)) + { + DevicePath = $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; + } + else + { + DevicePath = value; + } + } + } } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs index f48d218..92c33ee 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs @@ -6,8 +6,18 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 设备元数据 /// - public class DeviceMetadata + public sealed class DeviceMetadata { + /// + /// 实体类名称 + /// + public string EntityName { get; set; } + + /// + /// 设备表名或树路径,如果实体没有添加TableNameOrTreePath,此处为空 + /// + public string TableNameOrTreePath { get; set; } + /// /// 实体类型枚举 /// @@ -31,6 +41,42 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 值类型集合,用于构建Table的值类型,也就是dataTypes参数 /// - public List DataTypes { get; } = new(); + public List DataTypes { get; set; } = new(); + + /// + /// 列处理信息集合 + /// + public List Processors { get; } = new List(); + } + + /// + /// 列处理信息结构 + /// + public struct ColumnProcessor + { + /// + /// 列名 + /// + public string ColumnName; + + /// + /// 值获取委托 + /// + public Func ValueGetter; + + /// + /// 类型转换委托 + /// + public Func Converter; + + /// + /// 是否单测点 + /// + public bool IsSingleMeasuring; + + /// + /// 单测点名称委托 + /// + public Func SingleMeasuringNameGetter; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index db86791..34b4f4b 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -22,6 +22,10 @@ using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; using JiShe.CollectBus.Analyzers.Shared; using JiShe.CollectBus.IoTDB.Exceptions; +using System.Diagnostics.Metrics; +using Newtonsoft.Json.Linq; +using static System.Runtime.InteropServices.JavaScript.JSType; +using System.Text.RegularExpressions; namespace JiShe.CollectBus.IoTDB.Provider { @@ -68,8 +72,13 @@ namespace JiShe.CollectBus.IoTDB.Provider var metadata = await GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata); + if (tablet == null || tablet.Count <= 0) + { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); + return; + } - await CurrentSession.InsertAsync(tablet); + await CurrentSession.InsertAsync(tablet.First()); } catch (Exception ex) { @@ -95,7 +104,15 @@ namespace JiShe.CollectBus.IoTDB.Provider foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); - await CurrentSession.InsertAsync(tablet); + if (tablet == null || tablet.Count <= 0) + { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); + return; + } + foreach (var item in tablet) + { + await CurrentSession.InsertAsync(item); + } } } catch (Exception ex) @@ -123,7 +140,15 @@ namespace JiShe.CollectBus.IoTDB.Provider foreach (var batch in batches) { var tablet = BuildTablet(batch, deviceMetadata); - await CurrentSession.InsertAsync(tablet); + if (tablet == null || tablet.Count <= 0) + { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); + return; + } + foreach (var item in tablet) + { + await CurrentSession.InsertAsync(item); + } } } catch (Exception ex) @@ -181,22 +206,54 @@ namespace JiShe.CollectBus.IoTDB.Provider var accessor = SourceEntityAccessorFactory.GetAccessor(); var columns = CollectColumnMetadata(accessor); - var metadata = BuildDeviceMetadata(columns); + var tmpMetadata = BuildDeviceMetadata(columns, accessor); + + string tableNameOrTreePath = string.Empty; + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + if (tableNameOrTreePathAttribute != null) + { + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + } + tmpMetadata.EntityName = accessor.EntityName; + tmpMetadata.EntityType = accessor.EntityType; + tmpMetadata.TableNameOrTreePath = tableNameOrTreePath; + var metaData = MetadataCache.AddOrUpdate( typeof(T), - addValueFactory: t => metadata, // 如果键不存在,用此值添加 + addValueFactory: t => tmpMetadata, // 如果键不存在,用此值添加 updateValueFactory: (t, existingValue) => { var columns = CollectColumnMetadata(accessor); - var metadata = BuildDeviceMetadata(columns); + var metadata = BuildDeviceMetadata(columns, accessor); //对现有值 existingValue 进行修改,返回新值 + string tableNameOrTreePath = string.Empty; + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + if (tableNameOrTreePathAttribute != null) + { + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + } existingValue.ColumnNames = metadata.ColumnNames; + existingValue.DataTypes = metadata.DataTypes; return existingValue; } ); - metadata.EntityType = accessor.EntityType; + //var metaData = MetadataCache.GetOrAdd(typeof(T), type => + //{ + // var columns = CollectColumnMetadata(accessor); + // var metadata = BuildDeviceMetadata(columns, accessor); + // string tableNameOrTreePath = string.Empty; + // var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + // if (tableNameOrTreePathAttribute != null) + // { + // tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + // } + // metadata.EntityName = accessor.EntityName; + // metadata.EntityType = accessor.EntityType; + // metadata.TableNameOrTreePath = tableNameOrTreePath; + // return metadata; + //}); return await Task.FromResult(metaData); } @@ -251,338 +308,110 @@ namespace JiShe.CollectBus.IoTDB.Provider } } - ///// - ///// 构建Tablet - ///// - ///// - ///// 表实体 - ///// 设备元数据 - ///// - //private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity - //{ - // var timestamps = new List(); - // var values = new List>(); - // var devicePaths = new HashSet(); - // List tempColumnNames = new List(); - // tempColumnNames.AddRange(metadata.ColumnNames); - - // var accessor = SourceEntityAccessorFactory.GetAccessor(); - - // var memberCache = new Dictionary(); // 缓存优化查询 - // // 预构建成员缓存(Key: NameOrPath) - // foreach (var member in accessor.MemberList) - // { - // memberCache[member.NameOrPath] = member; - // } - - // if (accessor.EntityType == null || metadata.EntityType == null) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); - // } - - // if (metadata.EntityType != accessor.EntityType) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); - // } - - // if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); - // } - // else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); - // } - - // string tableNameOrTreePath = string.Empty; - // var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); - // if (tableNameOrTreePathAttribute != null) - // { - // tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; - // } - - // foreach (var entity in entities) - // { - // timestamps.Add(entity.Timestamps); - // var rowValues = new List(); - - // foreach (var measurement in metadata.ColumnNames) - // { - // if (!memberCache.TryGetValue(measurement, out var member)) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName}没有找到{measurement}对应的member信息,-105"); - // } - - // var value = member.GetValue(entity); - - // // 特性查询优化 - // var attributes = member.CustomAttributes ?? Enumerable.Empty(); - // var singleMeasuringAttr = attributes.OfType().FirstOrDefault(); - // if (singleMeasuringAttr != null)//如果是单侧点 - // { - - // var tupleItem1Key = $"{member.NameOrPath}.Item1"; - // if (!memberCache.TryGetValue(tupleItem1Key, out var tuple1Member)) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item1 信息,-106"); - // } - // int indexOf = metadata.ColumnNames.IndexOf(measurement); - // tempColumnNames[indexOf] = (string)tuple1Member.GetValue(entity); - - // var tupleItem2Key = $"{member.NameOrPath}.Item2"; - // if (!memberCache.TryGetValue(tupleItem2Key, out var tuple2Member)) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item2 信息,-107"); - // } - - // value = tuple2Member.GetValue(entity); - // } - - // if (value != null) - // { - // var tempValue = member.DeclaredTypeName.ToUpper() switch - // { - // "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), - // _ => value - // }; - - // rowValues.Add(tempValue); - // } - // else - // { - // rowValues.Add(value); - // } - // } - - // values.Add(rowValues); - - // //如果指定了路径 - // if (!string.IsNullOrWhiteSpace(tableNameOrTreePath)) - // { - // devicePaths.Add(tableNameOrTreePath); - // } - // else - // { - // if (!_runtimeContext.UseTableSessionPool)//树模型 - // { - // devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); - // } - // else - // { - // devicePaths.Add(DevicePathBuilder.GetTableName()); - // } - // } - // } - - // if (devicePaths.Count > 1) - // { - // throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。"); - // } - - // return _runtimeContext.UseTableSessionPool - // ? BuildTableSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps) - // : BuildSessionTablet(metadata, devicePaths.First(), tempColumnNames,values, timestamps); - //} - /// /// 构建Tablet /// /// - /// 表实体集合 + /// 表实体 /// 设备元数据 /// - private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity + private List BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity { - // 前置校验 - ValidateMetadataAndAccessor(metadata, out var accessor); - - // 初始化数据结构 - var (timestamps, values, devicePaths) = (new List(), new List>(), new HashSet()); - var tempColumnNames = new List(metadata.ColumnNames); - var memberCache = BuildMemberCache(accessor); - var tableNameOrTreePath = GetTableNameOrTreePath(); - - // 处理每个实体 - foreach (var entity in entities) + var entitiyList = entities.ToList(); + if (entitiyList == null || entitiyList.Count <= 0) { - ProcessEntity(entity, accessor, metadata, memberCache, tempColumnNames, timestamps, values); - UpdateDevicePaths(entity, tableNameOrTreePath, devicePaths); + return null; + } + + //var accessor = SourceEntityAccessorFactory.GetAccessor(); + + //var memberCache = BuildMemberCache(accessor); + + if (metadata.EntityType == null) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); + } + + if (metadata.EntityType == EntityTypeEnum.Other) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 不属于IoTDB数据模型实体,属于异常情况,-102"); + } + + if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); + } + else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); + } + string tableNameOrTreePath = string.Empty; + if (_runtimeContext.UseTableSessionPool)//表模型 + { + //如果指定了路径 + if (!string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath)) + { + tableNameOrTreePath = metadata.TableNameOrTreePath; + } + else + { + tableNameOrTreePath = DevicePathBuilder.GetTableName(); + } + + return new List() { BuildTablet(entitiyList, metadata, tableNameOrTreePath) }; + } + else + { + //树模型的时候,实体的设备Id可能会不同,因此需要根据不同路径进行存储。 + var tabletList = new List(); + var groupEntities = entitiyList.GroupBy(d => d.DevicePath).ToList(); + foreach (var group in groupEntities) + { + tabletList.Add(BuildTablet(group.ToList(), metadata, group.Key)); + } + + return tabletList; + } + } + + private Tablet BuildTablet(List entities, DeviceMetadata metadata, string tableNameOrTreePath) where T : IoTEntity + { + // 预分配内存结构 + var rowCount = entities.Count; + var timestamps = new long[rowCount]; + var values = new object[rowCount][]; + for (var i = 0; i < values.Length; i++) + { + values[i] = new object[metadata.ColumnNames.Count]; + } + + List tempColumnNames = new List(); + tempColumnNames.AddRange(metadata.ColumnNames); + + // 顺序处理数据(保证线程安全) + for (var row = 0; row < rowCount; row++) + { + var entity = entities[row]; + timestamps[row] = entity.Timestamps; + + for (int i = 0; i < metadata.ColumnNames.Count; i++) + { + var processor = metadata.Processors[i]; + if (processor.IsSingleMeasuring) + { + tempColumnNames[i] = (string)processor.SingleMeasuringNameGetter(entity); + } + + // 获取并转换值 + values[row][i] = processor.ValueGetter(entity); + } } - // 后置校验与返回 - ValidateDevicePaths(devicePaths); - // return CreateFinalTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps); return _runtimeContext.UseTableSessionPool - ? BuildTableSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps) - : BuildSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps); + ? BuildTableSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()) + : BuildSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()); } - private void ValidateMetadataAndAccessor(DeviceMetadata metadata, out ISourceEntityAccessor accessor) where T : IoTEntity - { - accessor = SourceEntityAccessorFactory.GetAccessor(); - - if (accessor.EntityType == null || metadata.EntityType == null) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,EntityType未指定", -101); - } - - if (metadata.EntityType != accessor.EntityType) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,EntityType不一致", -102); - } - - bool isTableModel = accessor.EntityType == EntityTypeEnum.TableModel; - if (_runtimeContext.UseTableSessionPool != isTableModel) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,Session类型不匹配: 预期{(isTableModel ? "Table" : "Tree")}模型", isTableModel ? -104 : -103); - } - } - - /// - /// 处理实体并获取值 - /// - /// - /// - /// - /// - /// - /// - /// - /// - /// - private void ProcessEntity( - T entity, - ISourceEntityAccessor accessor, - DeviceMetadata metadata, - Dictionary memberCache, - List tempColumnNames, - List timestamps, - List> values) where T : IoTEntity - { - timestamps.Add(entity.Timestamps); - var rowValues = new object[metadata.ColumnNames.Count]; - - Parallel.ForEach(metadata.ColumnNames, (measurement, state, index) => - { - if (!memberCache.TryGetValue(measurement, out var member)) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,找不到成员: {measurement}", -105); - } - - object value = ResolveMemberValue(entity, member, memberCache, tempColumnNames, (int)index); - rowValues[index] = ConvertValueByType(member, value); - }); - - values.Add(rowValues.ToList()); - } - - private object ResolveMemberValue( - T entity, - EntityMemberInfo member, - Dictionary memberCache, - List tempColumnNames, - int columnIndex) where T : IoTEntity - { - // 单测点逻辑 - if (member.CustomAttributes?.OfType().FirstOrDefault() is { } attr) - { - var tuple1Key = $"{member.NameOrPath}.Item1"; - var tuple2Key = $"{member.NameOrPath}.Item2"; - - if (!memberCache.TryGetValue(tuple1Key, out var tuple1) || !memberCache.TryGetValue(tuple2Key, out var tuple2)) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,单侧点元组成员缺失", -106); - } - - tempColumnNames[columnIndex] = (string)tuple1.GetValue(entity); - return tuple2.GetValue(entity); - } - return member.GetValue(entity); - } - - /// - /// 设置实体的成员值 - /// - /// - /// - /// - private object ConvertValueByType(EntityMemberInfo member, object value) - { - return member.DeclaredTypeName switch - { - "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), - _ => value - }; - } - - /// - /// 处理设备路径 - /// - /// - /// - /// - /// - private void UpdateDevicePaths( - T entity, - string tableNameOrTreePath, - HashSet devicePaths) where T : IoTEntity - { - if (!string.IsNullOrEmpty(tableNameOrTreePath)) - { - devicePaths.Add(tableNameOrTreePath); - return; - } - - var path = _runtimeContext.UseTableSessionPool - ? DevicePathBuilder.GetTableName() - : DevicePathBuilder.GetDevicePath(entity); - devicePaths.Add(path); - } - - /// - /// 验证设备路径 - /// - /// - private void ValidateDevicePaths(HashSet devicePaths) - { - if (devicePaths.Count == 0) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,设备路径集合为空", -108); - } - - if (devicePaths.Count > 1) - { - var paths = string.Join(", ", devicePaths.Take(3)); - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,设备路径不一致。检测到路径: {paths}...", -109); - } - } - } - - /// - /// 缓存优化:避免重复反射 - /// - /// - /// - private string GetTableNameOrTreePath() - { - return AttributeCache.TableNameOrTreePath; - } - - /// - /// 特性缓存辅助类 - /// - /// - private static class AttributeCache - { - public static readonly string TableNameOrTreePath; - - static AttributeCache() - { - var attr = typeof(T).GetCustomAttribute(); - TableNameOrTreePath = attr?.TableNameOrTreePath ?? string.Empty; - } - } /// /// 构建tree模型的Tablet @@ -615,7 +444,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 数据集合 /// 时间戳集合 /// - private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List columns,List> values, List timestamps) + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List columns, List> values, List timestamps) { var tablet = new Tablet( tableName, @@ -802,7 +631,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private List CollectColumnMetadata(ISourceEntityAccessor accessor) { var columns = new List(); - var memberCache = BuildMemberCache(accessor); + var memberCache = BuildMemberCache(accessor); foreach (var member in accessor.MemberList) { @@ -825,15 +654,15 @@ namespace JiShe.CollectBus.IoTDB.Provider ColumnInfo? column = null; if (tagAttr != null) { - column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName); } else if (attrColumn != null) { - column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName); } else if (fieldColumn != null) { - column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName); } // 单测模式处理 @@ -844,7 +673,7 @@ namespace JiShe.CollectBus.IoTDB.Provider { throw new Exception($"{nameof(CollectColumnMetadata)} {accessor.EntityName} {member.NameOrPath} 单侧点属性解析异常"); } - column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true, tupleMember.DeclaredTypeName); } if (column.HasValue) columns.Add(column.Value); @@ -858,7 +687,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 待解析的类 /// 已处理好的数据列 /// - private DeviceMetadata BuildDeviceMetadata(List columns) where T : IoTEntity + private DeviceMetadata BuildDeviceMetadata(List columns, ISourceEntityAccessor accessor) where T : IoTEntity { var metadata = new DeviceMetadata(); @@ -877,9 +706,97 @@ namespace JiShe.CollectBus.IoTDB.Provider ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); + // 新增处理器初始化 + foreach (var item in metadata.ColumnNames) + { + ColumnInfo column = columns.FirstOrDefault(d => d.Name == item); + + var processor = new ColumnProcessor + { + ColumnName = column.Name, + IsSingleMeasuring = column.IsSingleMeasuring, + Converter = CreateConverter(column.DeclaredTypeName.ToUpper()) + }; + + // 处理单测点 + if (column.IsSingleMeasuring) + { + var item1Member = accessor.MemberList + .First(m => m.NameOrPath == $"{column.Name}.Item1"); + + processor.SingleMeasuringNameGetter = (obj) => + { + // 获取原始值并转为字符串 + object rawValue = item1Member.Getter(obj); + string value = rawValue?.ToString(); + + if (!string.IsNullOrEmpty(value)) + { + // 规则1: 严格检查ASCII字母和数字(0-9, A-Z, a-z) + bool hasInvalidChars = value.Any(c => + !((c >= 'A' && c <= 'Z') || + (c >= 'a' && c <= 'z') || + (c >= '0' && c <= '9'))); + + // 规则2: 首字符不能是数字 + bool startsWithDigit = value[0] >= '0' && value[0] <= '9'; + + // 规则3: 全字符串不能都是数字 + bool allDigits = value.All(c => c >= '0' && c <= '9'); + + // 按优先级抛出具体异常 + if (hasInvalidChars) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字"); + } + else if (startsWithDigit) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 不能以数字开头"); + } + else if (allDigits) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 不能全为数字"); + } + } + + return value; + }; + + var item2Member = accessor.MemberList + .First(m => m.NameOrPath == $"{column.Name}.Item2"); + processor.ValueGetter = (obj) => { + object rawValue = item2Member.Getter(obj); + return processor.Converter(rawValue); + }; + } + else + { + // 获取对应的成员访问器 + var member = accessor.MemberList.First(m => m.NameOrPath == column.Name); + processor.ValueGetter = (obj) => { + object rawValue = member.Getter(obj); + return processor.Converter(rawValue); + }; + } + + metadata.Processors.Add(processor); + } + return metadata; } + private Func CreateConverter(string declaredTypeName) + { + return declaredTypeName switch + { + "DATETIME" => value => ((DateTime)value).GetDateTimeOffset().ToUnixTimeNanoseconds(), + _ => value => value + }; + } + /// /// 处理不同列类型的逻辑 /// @@ -906,6 +823,11 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public string Name { get; } + /// + /// 声明的类型的名称 + /// + public string DeclaredTypeName { get; } + /// /// 是否是单测点 /// @@ -921,12 +843,13 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public TSDataType DataType { get; } - public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring) + public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring, string declaredTypeName) { Name = name; Category = category; DataType = dataType; IsSingleMeasuring = isSingleMeasuring; + DeclaredTypeName = declaredTypeName; } } @@ -1023,5 +946,7 @@ namespace JiShe.CollectBus.IoTDB.Provider } return cache; } + + private static readonly Regex _asciiAlphanumericRegex = new Regex(@"^[a-zA-Z0-9]*$", RegexOptions.Compiled); } } diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index d6f9e04..c24ad18 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -140,13 +140,13 @@ namespace JiShe.CollectBus.DataChannels // 批量写入数据库 await _dbProvider.BatchInsertAsync(metadata, records); - //// 限流推送Kafka - //await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - // items: records, - // deviceIdSelector: data => data.DeviceId, - // processor: async (data, groupIndex) => - // await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) - //); + // 限流推送Kafka + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: records, + deviceIdSelector: data => data.DeviceId, + processor: async (data, groupIndex) => + await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) + ); } catch (Exception ex) { diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 843c6f7..0009e20 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -184,14 +184,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS time = DateTime.Now; //System.Reflection.PropertyInfo; //System.Reflection.FieldInfo - var meter = new TreeModelSingleMeasuringEntity() + var meter = new TreeModelSingleMeasuringEntity() { SystemName = "energy", DeviceId = "402440506", DeviceType = "1", ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), - SingleMeasuring = (measuring, value) + SingleMeasuring = (measuring, time) }; await _iotDBProvider.InsertAsync(meter); } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index bab879d..9d45f08 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -322,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - //return; + //return; // 创建取消令牌源 //var cts = new CancellationTokenSource(); @@ -1635,36 +1635,43 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType) { - string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); - return new MeterReadingTelemetryPacketInfo() + try { - SystemName = SystemType, - ProjectId = $"{ammeterInfo.ProjectID}", - DeviceType = $"{MeterTypeEnum.Ammeter}", - DeviceId = $"{ammeterInfo.MeterId}", - Timestamps = timestamps, - DatabaseBusiID = ammeterInfo.DatabaseBusiID, - PendingCopyReadTime = pendingCopyReadTime, - CreationTime = creationTime, - MeterAddress = ammeterInfo.AmmerterAddress, - PacketType = (int)packetType, - AFN = builderResponse.AFn, - Fn = builderResponse.Fn, - Seq = builderResponse.Seq, - MSA = builderResponse.MSA, - FocusId = ammeterInfo.FocusId, - FocusAddress = ammeterInfo.FocusAddress, - ItemCode = itemCode, - SubItemCode = subItemCode, - TaskMark = taskMark, - IsSend = false, - ManualOrNot = false, - Pn = ammeterInfo.MeteringCode, - IssuedMessageId = GuidGenerator.Create().ToString(), - IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), - IsReceived = false, - ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), - }; + string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); + return new MeterReadingTelemetryPacketInfo() + { + SystemName = SystemType, + ProjectId = $"{ammeterInfo.ProjectID}", + DeviceType = $"{MeterTypeEnum.Ammeter}", + DeviceId = $"{ammeterInfo.MeterId}", + Timestamps = timestamps, + DatabaseBusiID = ammeterInfo.DatabaseBusiID, + PendingCopyReadTime = pendingCopyReadTime, + CreationTime = creationTime, + MeterAddress = ammeterInfo.AmmerterAddress, + PacketType = (int)packetType, + AFN = builderResponse.AFn, + Fn = builderResponse.Fn, + Seq = builderResponse.Seq, + MSA = builderResponse.MSA, + FocusId = ammeterInfo.FocusId, + FocusAddress = ammeterInfo.FocusAddress, + ItemCode = itemCode, + SubItemCode = subItemCode, + TaskMark = taskMark, + IsSend = false, + ManualOrNot = false, + Pn = ammeterInfo.MeteringCode, + IssuedMessageId = Guid.NewGuid().ToString(), + IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), + IsReceived = false, + ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), + }; + } + catch (Exception ex) + { + throw ex; + } } #endregion