diff --git a/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs b/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs index bad9528..ecb9c78 100644 --- a/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs +++ b/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs @@ -226,7 +226,7 @@ namespace JiShe.CollectBus.IncrementalGenerator code.AppendLine($" public string EntityName {{get;}} = \"{classSymbol.Name}\";"); // 添加 EntityType 属性 code.AppendLine($" public EntityTypeEnum? EntityType {{ get; }} = {entityTypeValue};"); - + foreach (var prop in propList) { // 安全类型转换 @@ -573,7 +573,7 @@ namespace JiShe.CollectBus.IncrementalGenerator $"new EntityMemberInfo(" + $"\"{prop.Name}.{elementName}\", " + $"typeof({elementType}), " + - $"\"{elementDeclaredName}\", " + + $"typeof({elementType}).Name, " +//$"\"{elementDeclaredName}\", " + $"(e) => Get{prop.Name}_{elementName}(({entityType})e), " + $"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))"); } diff --git a/modules/JiShe.CollectBus.IoTDB/Exceptions/IoTException.cs b/modules/JiShe.CollectBus.IoTDB/Exceptions/IoTException.cs new file mode 100644 index 0000000..93bed4f --- /dev/null +++ b/modules/JiShe.CollectBus.IoTDB/Exceptions/IoTException.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDB.Exceptions +{ + /// + /// IoTDB异常 + /// + public class IoTException : Exception + { + public int ErrorCode { get; } + + public IoTException(string message, int errorCode) + : base($"{message} (Code: {errorCode})") + { + ErrorCode = errorCode; + } + } +} diff --git a/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj b/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj index 3911399..78b81e3 100644 --- a/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj +++ b/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj @@ -14,7 +14,6 @@ - + diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index f2d55a5..9df2488 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -1,6 +1,7 @@ using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.IoTDB.Attributes; +using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IoTDB.Model { @@ -43,5 +44,27 @@ namespace JiShe.CollectBus.IoTDB.Model /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + + /// + /// 设备路径 + /// + public virtual string DevicePath + { + get + { + return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; + } + set + { + if (string.IsNullOrWhiteSpace(value)) + { + DevicePath = $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; + } + else + { + DevicePath = value; + } + } + } } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs index f48d218..92c33ee 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs @@ -6,8 +6,18 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 设备元数据 /// - public class DeviceMetadata + public sealed class DeviceMetadata { + /// + /// 实体类名称 + /// + public string EntityName { get; set; } + + /// + /// 设备表名或树路径,如果实体没有添加TableNameOrTreePath,此处为空 + /// + public string TableNameOrTreePath { get; set; } + /// /// 实体类型枚举 /// @@ -31,6 +41,42 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 值类型集合,用于构建Table的值类型,也就是dataTypes参数 /// - public List DataTypes { get; } = new(); + public List DataTypes { get; set; } = new(); + + /// + /// 列处理信息集合 + /// + public List Processors { get; } = new List(); + } + + /// + /// 列处理信息结构 + /// + public struct ColumnProcessor + { + /// + /// 列名 + /// + public string ColumnName; + + /// + /// 值获取委托 + /// + public Func ValueGetter; + + /// + /// 类型转换委托 + /// + public Func Converter; + + /// + /// 是否单测点 + /// + public bool IsSingleMeasuring; + + /// + /// 单测点名称委托 + /// + public Func SingleMeasuringNameGetter; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index bacbf61..34b4f4b 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -21,6 +21,11 @@ using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; using JiShe.CollectBus.Analyzers.Shared; +using JiShe.CollectBus.IoTDB.Exceptions; +using System.Diagnostics.Metrics; +using Newtonsoft.Json.Linq; +using static System.Runtime.InteropServices.JavaScript.JSType; +using System.Text.RegularExpressions; namespace JiShe.CollectBus.IoTDB.Provider { @@ -67,8 +72,13 @@ namespace JiShe.CollectBus.IoTDB.Provider var metadata = await GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata); + if (tablet == null || tablet.Count <= 0) + { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); + return; + } - await CurrentSession.InsertAsync(tablet); + await CurrentSession.InsertAsync(tablet.First()); } catch (Exception ex) { @@ -94,7 +104,15 @@ namespace JiShe.CollectBus.IoTDB.Provider foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); - await CurrentSession.InsertAsync(tablet); + if (tablet == null || tablet.Count <= 0) + { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); + return; + } + foreach (var item in tablet) + { + await CurrentSession.InsertAsync(item); + } } } catch (Exception ex) @@ -122,7 +140,15 @@ namespace JiShe.CollectBus.IoTDB.Provider foreach (var batch in batches) { var tablet = BuildTablet(batch, deviceMetadata); - await CurrentSession.InsertAsync(tablet); + if (tablet == null || tablet.Count <= 0) + { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); + return; + } + foreach (var item in tablet) + { + await CurrentSession.InsertAsync(item); + } } } catch (Exception ex) @@ -180,22 +206,54 @@ namespace JiShe.CollectBus.IoTDB.Provider var accessor = SourceEntityAccessorFactory.GetAccessor(); var columns = CollectColumnMetadata(accessor); - var metadata = BuildDeviceMetadata(columns); + var tmpMetadata = BuildDeviceMetadata(columns, accessor); + + string tableNameOrTreePath = string.Empty; + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + if (tableNameOrTreePathAttribute != null) + { + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + } + tmpMetadata.EntityName = accessor.EntityName; + tmpMetadata.EntityType = accessor.EntityType; + tmpMetadata.TableNameOrTreePath = tableNameOrTreePath; + var metaData = MetadataCache.AddOrUpdate( typeof(T), - addValueFactory: t => metadata, // 如果键不存在,用此值添加 + addValueFactory: t => tmpMetadata, // 如果键不存在,用此值添加 updateValueFactory: (t, existingValue) => { var columns = CollectColumnMetadata(accessor); - var metadata = BuildDeviceMetadata(columns); + var metadata = BuildDeviceMetadata(columns, accessor); //对现有值 existingValue 进行修改,返回新值 + string tableNameOrTreePath = string.Empty; + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + if (tableNameOrTreePathAttribute != null) + { + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + } existingValue.ColumnNames = metadata.ColumnNames; + existingValue.DataTypes = metadata.DataTypes; return existingValue; } ); - metadata.EntityType = accessor.EntityType; + //var metaData = MetadataCache.GetOrAdd(typeof(T), type => + //{ + // var columns = CollectColumnMetadata(accessor); + // var metadata = BuildDeviceMetadata(columns, accessor); + // string tableNameOrTreePath = string.Empty; + // var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + // if (tableNameOrTreePathAttribute != null) + // { + // tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + // } + // metadata.EntityName = accessor.EntityName; + // metadata.EntityType = accessor.EntityType; + // metadata.TableNameOrTreePath = tableNameOrTreePath; + // return metadata; + //}); return await Task.FromResult(metaData); } @@ -257,31 +315,26 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 表实体 /// 设备元数据 /// - private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity + private List BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity { - var timestamps = new List(); - var values = new List>(); - var devicePaths = new HashSet(); - List tempColumnNames = new List(); - tempColumnNames.AddRange(metadata.ColumnNames); - - var accessor = SourceEntityAccessorFactory.GetAccessor(); - - var memberCache = new Dictionary(); // 缓存优化查询 - // 预构建成员缓存(Key: NameOrPath) - foreach (var member in accessor.MemberList) + var entitiyList = entities.ToList(); + if (entitiyList == null || entitiyList.Count <= 0) { - memberCache[member.NameOrPath] = member; + return null; } - if (accessor.EntityType == null || metadata.EntityType == null) + //var accessor = SourceEntityAccessorFactory.GetAccessor(); + + //var memberCache = BuildMemberCache(accessor); + + if (metadata.EntityType == null) { throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); } - if (metadata.EntityType != accessor.EntityType) + if (metadata.EntityType == EntityTypeEnum.Other) { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 不属于IoTDB数据模型实体,属于异常情况,-102"); } if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) @@ -292,103 +345,90 @@ namespace JiShe.CollectBus.IoTDB.Provider { throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); } - string tableNameOrTreePath = string.Empty; - var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); - if (tableNameOrTreePathAttribute != null) + if (_runtimeContext.UseTableSessionPool)//表模型 { - tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; - } - - foreach (var entity in entities) - { - timestamps.Add(entity.Timestamps); - var rowValues = new List(); - - foreach (var measurement in tempColumnNames) - { - if (!memberCache.TryGetValue(measurement, out var member)) - { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName}没有找到{measurement}对应的member信息,-105"); - } - - var value = member.GetValue(entity); - - // 特性查询优化 - var attributes = member.CustomAttributes ?? Enumerable.Empty(); - var singleMeasuringAttr = attributes.OfType().FirstOrDefault(); - if (singleMeasuringAttr != null)//如果是单侧点 - { - var tupleItemKey = $"{member.NameOrPath}.Item2"; - if (!memberCache.TryGetValue(tupleItemKey, out var tupleMember)) - { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item2 信息,-106"); - } - - value = tupleMember.GetValue(entity); - } - - if (value != null) - { - var tempValue = member.DeclaredTypeName.ToUpper() switch - { - "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), - _ => value - }; - - rowValues.Add(tempValue); - } - else - { - rowValues.Add(value); - } - } - - values.Add(rowValues); - //如果指定了路径 - if (!string.IsNullOrWhiteSpace(tableNameOrTreePath)) + if (!string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath)) { - devicePaths.Add(tableNameOrTreePath); + tableNameOrTreePath = metadata.TableNameOrTreePath; } else { - if (!_runtimeContext.UseTableSessionPool)//树模型 + tableNameOrTreePath = DevicePathBuilder.GetTableName(); + } + + return new List() { BuildTablet(entitiyList, metadata, tableNameOrTreePath) }; + } + else + { + //树模型的时候,实体的设备Id可能会不同,因此需要根据不同路径进行存储。 + var tabletList = new List(); + var groupEntities = entitiyList.GroupBy(d => d.DevicePath).ToList(); + foreach (var group in groupEntities) + { + tabletList.Add(BuildTablet(group.ToList(), metadata, group.Key)); + } + + return tabletList; + } + } + + private Tablet BuildTablet(List entities, DeviceMetadata metadata, string tableNameOrTreePath) where T : IoTEntity + { + // 预分配内存结构 + var rowCount = entities.Count; + var timestamps = new long[rowCount]; + var values = new object[rowCount][]; + for (var i = 0; i < values.Length; i++) + { + values[i] = new object[metadata.ColumnNames.Count]; + } + + List tempColumnNames = new List(); + tempColumnNames.AddRange(metadata.ColumnNames); + + // 顺序处理数据(保证线程安全) + for (var row = 0; row < rowCount; row++) + { + var entity = entities[row]; + timestamps[row] = entity.Timestamps; + + for (int i = 0; i < metadata.ColumnNames.Count; i++) + { + var processor = metadata.Processors[i]; + if (processor.IsSingleMeasuring) { - devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); - } - else - { - devicePaths.Add(DevicePathBuilder.GetTableName()); + tempColumnNames[i] = (string)processor.SingleMeasuringNameGetter(entity); } + + // 获取并转换值 + values[row][i] = processor.ValueGetter(entity); } } - if (devicePaths.Count > 1) - { - throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。"); - } - return _runtimeContext.UseTableSessionPool - ? BuildTableSessionTablet(metadata, devicePaths.First(), values, timestamps) - : BuildSessionTablet(metadata, devicePaths.First(), values, timestamps); + ? BuildTableSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()) + : BuildSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()); } + /// /// 构建tree模型的Tablet /// /// 已解析的设备数据元数据 /// 设备路径 + /// 数据列集合 /// 数据集合 /// 时间戳集合 /// - private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List> values, List timestamps) + private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List columns, List> values, List timestamps) { //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可 return new Tablet( devicePath, - metadata.ColumnNames, + columns, metadata.DataTypes, values, timestamps @@ -400,14 +440,15 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 已解析的设备数据元数据 /// 表名称 + /// 数据列集合 /// 数据集合 /// 时间戳集合 /// - private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List> values, List timestamps) + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List columns, List> values, List timestamps) { var tablet = new Tablet( tableName, - metadata.ColumnNames, + columns, metadata.ColumnCategories, metadata.DataTypes, values, @@ -535,13 +576,7 @@ namespace JiShe.CollectBus.IoTDB.Provider var metadata = await GetMetadata(); var accessor = SourceEntityAccessorFactory.GetAccessor(); - var memberCache = new Dictionary(); // 缓存优化查询 - - // 预构建成员缓存(Key: NameOrPath) - foreach (var member in accessor.MemberList) - { - memberCache[member.NameOrPath] = member; - } + var memberCache = BuildMemberCache(accessor); var columns = new List() { "Timestamps" }; var dataTypes = new List() { TSDataType.TIMESTAMP }; @@ -596,13 +631,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private List CollectColumnMetadata(ISourceEntityAccessor accessor) { var columns = new List(); - var memberCache = new Dictionary(); // 缓存优化查询 - - // 预构建成员缓存(Key: NameOrPath) - foreach (var member in accessor.MemberList) - { - memberCache[member.NameOrPath] = member; - } + var memberCache = BuildMemberCache(accessor); foreach (var member in accessor.MemberList) { @@ -625,15 +654,15 @@ namespace JiShe.CollectBus.IoTDB.Provider ColumnInfo? column = null; if (tagAttr != null) { - column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName); } else if (attrColumn != null) { - column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName); } else if (fieldColumn != null) { - column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName); } // 单测模式处理 @@ -644,7 +673,7 @@ namespace JiShe.CollectBus.IoTDB.Provider { throw new Exception($"{nameof(CollectColumnMetadata)} {accessor.EntityName} {member.NameOrPath} 单侧点属性解析异常"); } - column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true, tupleMember.DeclaredTypeName); } if (column.HasValue) columns.Add(column.Value); @@ -658,7 +687,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 待解析的类 /// 已处理好的数据列 /// - private DeviceMetadata BuildDeviceMetadata(List columns) where T : IoTEntity + private DeviceMetadata BuildDeviceMetadata(List columns, ISourceEntityAccessor accessor) where T : IoTEntity { var metadata = new DeviceMetadata(); @@ -677,9 +706,97 @@ namespace JiShe.CollectBus.IoTDB.Provider ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); + // 新增处理器初始化 + foreach (var item in metadata.ColumnNames) + { + ColumnInfo column = columns.FirstOrDefault(d => d.Name == item); + + var processor = new ColumnProcessor + { + ColumnName = column.Name, + IsSingleMeasuring = column.IsSingleMeasuring, + Converter = CreateConverter(column.DeclaredTypeName.ToUpper()) + }; + + // 处理单测点 + if (column.IsSingleMeasuring) + { + var item1Member = accessor.MemberList + .First(m => m.NameOrPath == $"{column.Name}.Item1"); + + processor.SingleMeasuringNameGetter = (obj) => + { + // 获取原始值并转为字符串 + object rawValue = item1Member.Getter(obj); + string value = rawValue?.ToString(); + + if (!string.IsNullOrEmpty(value)) + { + // 规则1: 严格检查ASCII字母和数字(0-9, A-Z, a-z) + bool hasInvalidChars = value.Any(c => + !((c >= 'A' && c <= 'Z') || + (c >= 'a' && c <= 'z') || + (c >= '0' && c <= '9'))); + + // 规则2: 首字符不能是数字 + bool startsWithDigit = value[0] >= '0' && value[0] <= '9'; + + // 规则3: 全字符串不能都是数字 + bool allDigits = value.All(c => c >= '0' && c <= '9'); + + // 按优先级抛出具体异常 + if (hasInvalidChars) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字"); + } + else if (startsWithDigit) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 不能以数字开头"); + } + else if (allDigits) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 不能全为数字"); + } + } + + return value; + }; + + var item2Member = accessor.MemberList + .First(m => m.NameOrPath == $"{column.Name}.Item2"); + processor.ValueGetter = (obj) => { + object rawValue = item2Member.Getter(obj); + return processor.Converter(rawValue); + }; + } + else + { + // 获取对应的成员访问器 + var member = accessor.MemberList.First(m => m.NameOrPath == column.Name); + processor.ValueGetter = (obj) => { + object rawValue = member.Getter(obj); + return processor.Converter(rawValue); + }; + } + + metadata.Processors.Add(processor); + } + return metadata; } + private Func CreateConverter(string declaredTypeName) + { + return declaredTypeName switch + { + "DATETIME" => value => ((DateTime)value).GetDateTimeOffset().ToUnixTimeNanoseconds(), + _ => value => value + }; + } + /// /// 处理不同列类型的逻辑 /// @@ -706,6 +823,11 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public string Name { get; } + /// + /// 声明的类型的名称 + /// + public string DeclaredTypeName { get; } + /// /// 是否是单测点 /// @@ -721,12 +843,13 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public TSDataType DataType { get; } - public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring) + public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring, string declaredTypeName) { Name = name; Category = category; DataType = dataType; IsSingleMeasuring = isSingleMeasuring; + DeclaredTypeName = declaredTypeName; } } @@ -807,5 +930,23 @@ namespace JiShe.CollectBus.IoTDB.Provider TSDataType.STRING => Convert.ToString(value), _ => Convert.ToString(value) }; + + /// + /// 缓存实体属性信息 + /// + /// + /// + /// + private Dictionary BuildMemberCache(ISourceEntityAccessor accessor) + { + var cache = new Dictionary(StringComparer.Ordinal); + foreach (var member in accessor.MemberList) + { + cache[member.NameOrPath] = member; + } + return cache; + } + + private static readonly Regex _asciiAlphanumericRegex = new Regex(@"^[a-zA-Z0-9]*$", RegexOptions.Compiled); } } diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index d6f9e04..c24ad18 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -140,13 +140,13 @@ namespace JiShe.CollectBus.DataChannels // 批量写入数据库 await _dbProvider.BatchInsertAsync(metadata, records); - //// 限流推送Kafka - //await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - // items: records, - // deviceIdSelector: data => data.DeviceId, - // processor: async (data, groupIndex) => - // await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) - //); + // 限流推送Kafka + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: records, + deviceIdSelector: data => data.DeviceId, + processor: async (data, groupIndex) => + await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) + ); } catch (Exception ex) { diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 843c6f7..0009e20 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -184,14 +184,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS time = DateTime.Now; //System.Reflection.PropertyInfo; //System.Reflection.FieldInfo - var meter = new TreeModelSingleMeasuringEntity() + var meter = new TreeModelSingleMeasuringEntity() { SystemName = "energy", DeviceId = "402440506", DeviceType = "1", ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), - SingleMeasuring = (measuring, value) + SingleMeasuring = (measuring, time) }; await _iotDBProvider.InsertAsync(meter); } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 32a1d7c..9d45f08 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -7,8 +7,9 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.DataChannels; +using JiShe.CollectBus.DataMigration.Options; using JiShe.CollectBus.GatherItem; -using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; @@ -17,8 +18,8 @@ using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Internal; -using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Interfaces; +using JiShe.CollectBus.Protocol.Models; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; @@ -26,14 +27,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; -using JiShe.CollectBus.Protocol.Models; -using System.Threading.Channels; -using static IdentityModel.ClaimComparer; -using JiShe.CollectBus.DataChannels; -using JiShe.CollectBus.DataMigration.Options; -using static System.Runtime.InteropServices.JavaScript.JSType; -using static System.Formats.Asn1.AsnWriter; -using System.Threading; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -329,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - return; + //return; // 创建取消令牌源 //var cts = new CancellationTokenSource(); @@ -744,7 +737,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading ItemCode = tempItem, DataTimeMark = new Protocol.DataTimeMark() { - Density = ammeterInfo.TimeDensity,//todo 转换成协议的值 + Density = ammeterInfo.TimeDensity.GetDensity(),//转换成协议的值 Point = 1, DataTime = timestamps, } @@ -1354,7 +1347,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading } List taskList = new List(); - var metadata = await _dbProvider.GetMetadata(); var itemCode = T37612012PacketItemCodeConst.AFN09HFN01H; //var subItemCode = T6452007PacketItemCodeConst.C08; @@ -1643,37 +1635,44 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType) { - string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); - return new MeterReadingTelemetryPacketInfo() + try { - SystemName = SystemType, - ProjectId = $"{ammeterInfo.ProjectID}", - DeviceType = $"{MeterTypeEnum.Ammeter}", - DeviceId = $"{ammeterInfo.MeterId}", - Timestamps = timestamps, - DatabaseBusiID = ammeterInfo.DatabaseBusiID, - PendingCopyReadTime = pendingCopyReadTime, - CreationTime = creationTime, - MeterAddress = ammeterInfo.AmmerterAddress, - PacketType = (int)packetType, - AFN = builderResponse.AFn, - Fn = builderResponse.Fn, - Seq = builderResponse.Seq, - MSA = builderResponse.MSA, - FocusId = ammeterInfo.FocusId, - FocusAddress = ammeterInfo.FocusAddress, - ItemCode = itemCode, - SubItemCode = subItemCode, - TaskMark = taskMark, - IsSend = false, - ManualOrNot = false, - Pn = ammeterInfo.MeteringCode, - IssuedMessageId = GuidGenerator.Create().ToString(), - IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), - IsReceived = false, - ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), - }; - } + string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); + return new MeterReadingTelemetryPacketInfo() + { + SystemName = SystemType, + ProjectId = $"{ammeterInfo.ProjectID}", + DeviceType = $"{MeterTypeEnum.Ammeter}", + DeviceId = $"{ammeterInfo.MeterId}", + Timestamps = timestamps, + DatabaseBusiID = ammeterInfo.DatabaseBusiID, + PendingCopyReadTime = pendingCopyReadTime, + CreationTime = creationTime, + MeterAddress = ammeterInfo.AmmerterAddress, + PacketType = (int)packetType, + AFN = builderResponse.AFn, + Fn = builderResponse.Fn, + Seq = builderResponse.Seq, + MSA = builderResponse.MSA, + FocusId = ammeterInfo.FocusId, + FocusAddress = ammeterInfo.FocusAddress, + ItemCode = itemCode, + SubItemCode = subItemCode, + TaskMark = taskMark, + IsSend = false, + ManualOrNot = false, + Pn = ammeterInfo.MeteringCode, + IssuedMessageId = Guid.NewGuid().ToString(), + IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), + IsReceived = false, + ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), + }; + } + catch (Exception ex) + { + throw ex; + } + } #endregion } diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 648c1aa..e3177a1 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -859,6 +859,21 @@ namespace JiShe.CollectBus.Common.Helpers } - + /// + /// 采集频率转换为集中器采集密度 + /// + /// + /// + public static int GetDensity(this int timeDensity) => + timeDensity switch + { + 0 => 0,//无 + 1 => 255,//1分钟 + 5 => 245,//5分钟 + 15 => 1,//15分钟 + 30 => 2,//30分钟 + 60 => 3,//60分钟 + _ => -1//采集项本身无密度位 + }; } } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index 30e91e8..9509b08 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -15,8 +15,7 @@ - 后端服务 - + 后端服务 diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index 7034d26..43f1723 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -141,7 +141,7 @@ } }, "ServerApplicationOptions": { - "ServerTagName": "JiSheCollectBus99", + "ServerTagName": "JiSheCollectBus4", "SystemType": "Energy", "FirstCollectionTime": "2025-04-28 15:07:00", "AutomaticVerificationTime": "16:07:00",