From 541c118dbbe06600f9546eeb3c7b17e8ef369e37 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Thu, 8 May 2025 22:44:01 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8DIoTDB=E9=A9=B1=E5=8A=A8?= =?UTF-8?q?=E9=80=82=E9=85=8D=E5=A2=9E=E9=87=8F=E6=BA=90=E7=A0=81=E7=94=9F?= =?UTF-8?q?=E6=88=90=E5=99=A8=E4=BB=A5=E5=90=8E=EF=BC=8C=E5=8D=95=E4=BE=A7?= =?UTF-8?q?=E7=82=B9=E6=A8=A1=E5=BC=8F=E5=A4=B1=E6=95=88=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Exceptions/IoTException.cs | 22 + .../JiShe.CollectBus.IoTDB.csproj | 3 +- .../Provider/IoTDBProvider.cs | 430 +++++++++++++----- .../BasicScheduledMeterReadingService.cs | 20 +- .../Helpers/CommonHelper.cs | 17 +- .../Pages/Monitor.cshtml | 3 +- web/JiShe.CollectBus.Host/appsettings.json | 2 +- 7 files changed, 370 insertions(+), 127 deletions(-) create mode 100644 modules/JiShe.CollectBus.IoTDB/Exceptions/IoTException.cs diff --git a/modules/JiShe.CollectBus.IoTDB/Exceptions/IoTException.cs b/modules/JiShe.CollectBus.IoTDB/Exceptions/IoTException.cs new file mode 100644 index 0000000..93bed4f --- /dev/null +++ b/modules/JiShe.CollectBus.IoTDB/Exceptions/IoTException.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDB.Exceptions +{ + /// + /// IoTDB异常 + /// + public class IoTException : Exception + { + public int ErrorCode { get; } + + public IoTException(string message, int errorCode) + : base($"{message} (Code: {errorCode})") + { + ErrorCode = errorCode; + } + } +} diff --git a/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj b/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj index 3911399..78b81e3 100644 --- a/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj +++ b/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj @@ -14,7 +14,6 @@ - + diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index bacbf61..db86791 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -21,6 +21,7 @@ using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; using JiShe.CollectBus.Analyzers.Shared; +using JiShe.CollectBus.IoTDB.Exceptions; namespace JiShe.CollectBus.IoTDB.Provider { @@ -250,128 +251,337 @@ namespace JiShe.CollectBus.IoTDB.Provider } } + ///// + ///// 构建Tablet + ///// + ///// + ///// 表实体 + ///// 设备元数据 + ///// + //private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity + //{ + // var timestamps = new List(); + // var values = new List>(); + // var devicePaths = new HashSet(); + // List tempColumnNames = new List(); + // tempColumnNames.AddRange(metadata.ColumnNames); + + // var accessor = SourceEntityAccessorFactory.GetAccessor(); + + // var memberCache = new Dictionary(); // 缓存优化查询 + // // 预构建成员缓存(Key: NameOrPath) + // foreach (var member in accessor.MemberList) + // { + // memberCache[member.NameOrPath] = member; + // } + + // if (accessor.EntityType == null || metadata.EntityType == null) + // { + // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); + // } + + // if (metadata.EntityType != accessor.EntityType) + // { + // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); + // } + + // if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) + // { + // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); + // } + // else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) + // { + // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); + // } + + // string tableNameOrTreePath = string.Empty; + // var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + // if (tableNameOrTreePathAttribute != null) + // { + // tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + // } + + // foreach (var entity in entities) + // { + // timestamps.Add(entity.Timestamps); + // var rowValues = new List(); + + // foreach (var measurement in metadata.ColumnNames) + // { + // if (!memberCache.TryGetValue(measurement, out var member)) + // { + // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName}没有找到{measurement}对应的member信息,-105"); + // } + + // var value = member.GetValue(entity); + + // // 特性查询优化 + // var attributes = member.CustomAttributes ?? Enumerable.Empty(); + // var singleMeasuringAttr = attributes.OfType().FirstOrDefault(); + // if (singleMeasuringAttr != null)//如果是单侧点 + // { + + // var tupleItem1Key = $"{member.NameOrPath}.Item1"; + // if (!memberCache.TryGetValue(tupleItem1Key, out var tuple1Member)) + // { + // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item1 信息,-106"); + // } + // int indexOf = metadata.ColumnNames.IndexOf(measurement); + // tempColumnNames[indexOf] = (string)tuple1Member.GetValue(entity); + + // var tupleItem2Key = $"{member.NameOrPath}.Item2"; + // if (!memberCache.TryGetValue(tupleItem2Key, out var tuple2Member)) + // { + // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item2 信息,-107"); + // } + + // value = tuple2Member.GetValue(entity); + // } + + // if (value != null) + // { + // var tempValue = member.DeclaredTypeName.ToUpper() switch + // { + // "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), + // _ => value + // }; + + // rowValues.Add(tempValue); + // } + // else + // { + // rowValues.Add(value); + // } + // } + + // values.Add(rowValues); + + // //如果指定了路径 + // if (!string.IsNullOrWhiteSpace(tableNameOrTreePath)) + // { + // devicePaths.Add(tableNameOrTreePath); + // } + // else + // { + // if (!_runtimeContext.UseTableSessionPool)//树模型 + // { + // devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); + // } + // else + // { + // devicePaths.Add(DevicePathBuilder.GetTableName()); + // } + // } + // } + + // if (devicePaths.Count > 1) + // { + // throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。"); + // } + + // return _runtimeContext.UseTableSessionPool + // ? BuildTableSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps) + // : BuildSessionTablet(metadata, devicePaths.First(), tempColumnNames,values, timestamps); + //} + /// /// 构建Tablet /// /// - /// 表实体 + /// 表实体集合 /// 设备元数据 /// private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity { - var timestamps = new List(); - var values = new List>(); - var devicePaths = new HashSet(); - List tempColumnNames = new List(); - tempColumnNames.AddRange(metadata.ColumnNames); - - var accessor = SourceEntityAccessorFactory.GetAccessor(); + // 前置校验 + ValidateMetadataAndAccessor(metadata, out var accessor); - var memberCache = new Dictionary(); // 缓存优化查询 - // 预构建成员缓存(Key: NameOrPath) - foreach (var member in accessor.MemberList) + // 初始化数据结构 + var (timestamps, values, devicePaths) = (new List(), new List>(), new HashSet()); + var tempColumnNames = new List(metadata.ColumnNames); + var memberCache = BuildMemberCache(accessor); + var tableNameOrTreePath = GetTableNameOrTreePath(); + + // 处理每个实体 + foreach (var entity in entities) { - memberCache[member.NameOrPath] = member; + ProcessEntity(entity, accessor, metadata, memberCache, tempColumnNames, timestamps, values); + UpdateDevicePaths(entity, tableNameOrTreePath, devicePaths); } + // 后置校验与返回 + ValidateDevicePaths(devicePaths); + // return CreateFinalTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps); + return _runtimeContext.UseTableSessionPool + ? BuildTableSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps) + : BuildSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps); + } + + private void ValidateMetadataAndAccessor(DeviceMetadata metadata, out ISourceEntityAccessor accessor) where T : IoTEntity + { + accessor = SourceEntityAccessorFactory.GetAccessor(); + if (accessor.EntityType == null || metadata.EntityType == null) { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); + throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,EntityType未指定", -101); } if (metadata.EntityType != accessor.EntityType) { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); + throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,EntityType不一致", -102); } - if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) + bool isTableModel = accessor.EntityType == EntityTypeEnum.TableModel; + if (_runtimeContext.UseTableSessionPool != isTableModel) { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); + throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,Session类型不匹配: 预期{(isTableModel ? "Table" : "Tree")}模型", isTableModel ? -104 : -103); } - else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) + } + + /// + /// 处理实体并获取值 + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + private void ProcessEntity( + T entity, + ISourceEntityAccessor accessor, + DeviceMetadata metadata, + Dictionary memberCache, + List tempColumnNames, + List timestamps, + List> values) where T : IoTEntity + { + timestamps.Add(entity.Timestamps); + var rowValues = new object[metadata.ColumnNames.Count]; + + Parallel.ForEach(metadata.ColumnNames, (measurement, state, index) => { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); - } - - string tableNameOrTreePath = string.Empty; - var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); - if (tableNameOrTreePathAttribute != null) - { - tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; - } - - foreach (var entity in entities) - { - timestamps.Add(entity.Timestamps); - var rowValues = new List(); - - foreach (var measurement in tempColumnNames) - { - if (!memberCache.TryGetValue(measurement, out var member)) - { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName}没有找到{measurement}对应的member信息,-105"); - } - - var value = member.GetValue(entity); - - // 特性查询优化 - var attributes = member.CustomAttributes ?? Enumerable.Empty(); - var singleMeasuringAttr = attributes.OfType().FirstOrDefault(); - if (singleMeasuringAttr != null)//如果是单侧点 - { - var tupleItemKey = $"{member.NameOrPath}.Item2"; - if (!memberCache.TryGetValue(tupleItemKey, out var tupleMember)) - { - throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item2 信息,-106"); - } - - value = tupleMember.GetValue(entity); - } - - if (value != null) - { - var tempValue = member.DeclaredTypeName.ToUpper() switch - { - "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), - _ => value - }; - - rowValues.Add(tempValue); - } - else - { - rowValues.Add(value); - } - } - - values.Add(rowValues); - - //如果指定了路径 - if (!string.IsNullOrWhiteSpace(tableNameOrTreePath)) + if (!memberCache.TryGetValue(measurement, out var member)) { - devicePaths.Add(tableNameOrTreePath); + throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,找不到成员: {measurement}", -105); } - else + + object value = ResolveMemberValue(entity, member, memberCache, tempColumnNames, (int)index); + rowValues[index] = ConvertValueByType(member, value); + }); + + values.Add(rowValues.ToList()); + } + + private object ResolveMemberValue( + T entity, + EntityMemberInfo member, + Dictionary memberCache, + List tempColumnNames, + int columnIndex) where T : IoTEntity + { + // 单测点逻辑 + if (member.CustomAttributes?.OfType().FirstOrDefault() is { } attr) + { + var tuple1Key = $"{member.NameOrPath}.Item1"; + var tuple2Key = $"{member.NameOrPath}.Item2"; + + if (!memberCache.TryGetValue(tuple1Key, out var tuple1) || !memberCache.TryGetValue(tuple2Key, out var tuple2)) { - if (!_runtimeContext.UseTableSessionPool)//树模型 - { - devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); - } - else - { - devicePaths.Add(DevicePathBuilder.GetTableName()); - } + throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,单侧点元组成员缺失", -106); } + + tempColumnNames[columnIndex] = (string)tuple1.GetValue(entity); + return tuple2.GetValue(entity); + } + return member.GetValue(entity); + } + + /// + /// 设置实体的成员值 + /// + /// + /// + /// + private object ConvertValueByType(EntityMemberInfo member, object value) + { + return member.DeclaredTypeName switch + { + "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), + _ => value + }; + } + + /// + /// 处理设备路径 + /// + /// + /// + /// + /// + private void UpdateDevicePaths( + T entity, + string tableNameOrTreePath, + HashSet devicePaths) where T : IoTEntity + { + if (!string.IsNullOrEmpty(tableNameOrTreePath)) + { + devicePaths.Add(tableNameOrTreePath); + return; + } + + var path = _runtimeContext.UseTableSessionPool + ? DevicePathBuilder.GetTableName() + : DevicePathBuilder.GetDevicePath(entity); + devicePaths.Add(path); + } + + /// + /// 验证设备路径 + /// + /// + private void ValidateDevicePaths(HashSet devicePaths) + { + if (devicePaths.Count == 0) + { + throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,设备路径集合为空", -108); } if (devicePaths.Count > 1) { - throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。"); + var paths = string.Join(", ", devicePaths.Take(3)); + { + throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,设备路径不一致。检测到路径: {paths}...", -109); + } } + } - return _runtimeContext.UseTableSessionPool - ? BuildTableSessionTablet(metadata, devicePaths.First(), values, timestamps) - : BuildSessionTablet(metadata, devicePaths.First(), values, timestamps); + /// + /// 缓存优化:避免重复反射 + /// + /// + /// + private string GetTableNameOrTreePath() + { + return AttributeCache.TableNameOrTreePath; + } + + /// + /// 特性缓存辅助类 + /// + /// + private static class AttributeCache + { + public static readonly string TableNameOrTreePath; + + static AttributeCache() + { + var attr = typeof(T).GetCustomAttribute(); + TableNameOrTreePath = attr?.TableNameOrTreePath ?? string.Empty; + } } /// @@ -379,16 +589,17 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 已解析的设备数据元数据 /// 设备路径 + /// 数据列集合 /// 数据集合 /// 时间戳集合 /// - private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List> values, List timestamps) + private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List columns, List> values, List timestamps) { //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可 return new Tablet( devicePath, - metadata.ColumnNames, + columns, metadata.DataTypes, values, timestamps @@ -400,14 +611,15 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 已解析的设备数据元数据 /// 表名称 + /// 数据列集合 /// 数据集合 /// 时间戳集合 /// - private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List> values, List timestamps) + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List columns,List> values, List timestamps) { var tablet = new Tablet( tableName, - metadata.ColumnNames, + columns, metadata.ColumnCategories, metadata.DataTypes, values, @@ -535,13 +747,7 @@ namespace JiShe.CollectBus.IoTDB.Provider var metadata = await GetMetadata(); var accessor = SourceEntityAccessorFactory.GetAccessor(); - var memberCache = new Dictionary(); // 缓存优化查询 - - // 预构建成员缓存(Key: NameOrPath) - foreach (var member in accessor.MemberList) - { - memberCache[member.NameOrPath] = member; - } + var memberCache = BuildMemberCache(accessor); var columns = new List() { "Timestamps" }; var dataTypes = new List() { TSDataType.TIMESTAMP }; @@ -596,13 +802,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private List CollectColumnMetadata(ISourceEntityAccessor accessor) { var columns = new List(); - var memberCache = new Dictionary(); // 缓存优化查询 - - // 预构建成员缓存(Key: NameOrPath) - foreach (var member in accessor.MemberList) - { - memberCache[member.NameOrPath] = member; - } + var memberCache = BuildMemberCache(accessor); foreach (var member in accessor.MemberList) { @@ -807,5 +1007,21 @@ namespace JiShe.CollectBus.IoTDB.Provider TSDataType.STRING => Convert.ToString(value), _ => Convert.ToString(value) }; + + /// + /// 缓存实体属性信息 + /// + /// + /// + /// + private Dictionary BuildMemberCache(ISourceEntityAccessor accessor) + { + var cache = new Dictionary(StringComparer.Ordinal); + foreach (var member in accessor.MemberList) + { + cache[member.NameOrPath] = member; + } + return cache; + } } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 32a1d7c..bab879d 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -7,8 +7,9 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.DataChannels; +using JiShe.CollectBus.DataMigration.Options; using JiShe.CollectBus.GatherItem; -using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; @@ -17,8 +18,8 @@ using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Internal; -using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Interfaces; +using JiShe.CollectBus.Protocol.Models; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; @@ -26,14 +27,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; -using JiShe.CollectBus.Protocol.Models; -using System.Threading.Channels; -using static IdentityModel.ClaimComparer; -using JiShe.CollectBus.DataChannels; -using JiShe.CollectBus.DataMigration.Options; -using static System.Runtime.InteropServices.JavaScript.JSType; -using static System.Formats.Asn1.AsnWriter; -using System.Threading; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -329,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - return; + //return; // 创建取消令牌源 //var cts = new CancellationTokenSource(); @@ -744,7 +737,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading ItemCode = tempItem, DataTimeMark = new Protocol.DataTimeMark() { - Density = ammeterInfo.TimeDensity,//todo 转换成协议的值 + Density = ammeterInfo.TimeDensity.GetDensity(),//转换成协议的值 Point = 1, DataTime = timestamps, } @@ -1354,7 +1347,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading } List taskList = new List(); - var metadata = await _dbProvider.GetMetadata(); var itemCode = T37612012PacketItemCodeConst.AFN09HFN01H; //var subItemCode = T6452007PacketItemCodeConst.C08; @@ -1673,7 +1665,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IsReceived = false, ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), }; - } + } #endregion } diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 648c1aa..e3177a1 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -859,6 +859,21 @@ namespace JiShe.CollectBus.Common.Helpers } - + /// + /// 采集频率转换为集中器采集密度 + /// + /// + /// + public static int GetDensity(this int timeDensity) => + timeDensity switch + { + 0 => 0,//无 + 1 => 255,//1分钟 + 5 => 245,//5分钟 + 15 => 1,//15分钟 + 30 => 2,//30分钟 + 60 => 3,//60分钟 + _ => -1//采集项本身无密度位 + }; } } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index 30e91e8..9509b08 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -15,8 +15,7 @@ - 后端服务 - + 后端服务 diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index 7034d26..43f1723 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -141,7 +141,7 @@ } }, "ServerApplicationOptions": { - "ServerTagName": "JiSheCollectBus99", + "ServerTagName": "JiSheCollectBus4", "SystemType": "Energy", "FirstCollectionTime": "2025-04-28 15:07:00", "AutomaticVerificationTime": "16:07:00", From f6130e1d0b32d5f3ae36de73079244a8e17c6118 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Fri, 9 May 2025 17:54:52 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BC=98=E5=8C=96IoTDB=E9=A9=B1=E5=8A=A8?= =?UTF-8?q?=EF=BC=8C=E5=AE=9E=E7=8E=B030=E4=B8=AA=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E5=AE=9E=E4=BD=93=E7=9A=84270W=E4=BB=BB=E5=8A=A1=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BC=80=E5=8F=91=E7=8E=AF=E5=A2=83=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=B9=B3=E5=9D=87=E8=80=97=E6=97=B6=E5=9C=A870=E7=A7=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ComplexTypeSourceAnalyzers.cs | 4 +- .../JiShe.CollectBus.IoTDB/Model/IoTEntity.cs | 23 + .../Provider/DeviceMetadata.cs | 50 +- .../Provider/IoTDBProvider.cs | 593 ++++++++---------- .../DataChannels/DataChannelManageService.cs | 14 +- .../Samples/SampleAppService.cs | 4 +- .../BasicScheduledMeterReadingService.cs | 67 +- 7 files changed, 378 insertions(+), 377 deletions(-) diff --git a/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs b/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs index bad9528..ecb9c78 100644 --- a/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs +++ b/modules/JiShe.CollectBus.Analyzers/ComplexTypeSourceAnalyzers.cs @@ -226,7 +226,7 @@ namespace JiShe.CollectBus.IncrementalGenerator code.AppendLine($" public string EntityName {{get;}} = \"{classSymbol.Name}\";"); // 添加 EntityType 属性 code.AppendLine($" public EntityTypeEnum? EntityType {{ get; }} = {entityTypeValue};"); - + foreach (var prop in propList) { // 安全类型转换 @@ -573,7 +573,7 @@ namespace JiShe.CollectBus.IncrementalGenerator $"new EntityMemberInfo(" + $"\"{prop.Name}.{elementName}\", " + $"typeof({elementType}), " + - $"\"{elementDeclaredName}\", " + + $"typeof({elementType}).Name, " +//$"\"{elementDeclaredName}\", " + $"(e) => Get{prop.Name}_{elementName}(({entityType})e), " + $"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))"); } diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index f2d55a5..9df2488 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -1,6 +1,7 @@ using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.IoTDB.Attributes; +using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IoTDB.Model { @@ -43,5 +44,27 @@ namespace JiShe.CollectBus.IoTDB.Model /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + + /// + /// 设备路径 + /// + public virtual string DevicePath + { + get + { + return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; + } + set + { + if (string.IsNullOrWhiteSpace(value)) + { + DevicePath = $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`"; + } + else + { + DevicePath = value; + } + } + } } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs index f48d218..92c33ee 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs @@ -6,8 +6,18 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 设备元数据 /// - public class DeviceMetadata + public sealed class DeviceMetadata { + /// + /// 实体类名称 + /// + public string EntityName { get; set; } + + /// + /// 设备表名或树路径,如果实体没有添加TableNameOrTreePath,此处为空 + /// + public string TableNameOrTreePath { get; set; } + /// /// 实体类型枚举 /// @@ -31,6 +41,42 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 值类型集合,用于构建Table的值类型,也就是dataTypes参数 /// - public List DataTypes { get; } = new(); + public List DataTypes { get; set; } = new(); + + /// + /// 列处理信息集合 + /// + public List Processors { get; } = new List(); + } + + /// + /// 列处理信息结构 + /// + public struct ColumnProcessor + { + /// + /// 列名 + /// + public string ColumnName; + + /// + /// 值获取委托 + /// + public Func ValueGetter; + + /// + /// 类型转换委托 + /// + public Func Converter; + + /// + /// 是否单测点 + /// + public bool IsSingleMeasuring; + + /// + /// 单测点名称委托 + /// + public Func SingleMeasuringNameGetter; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index db86791..34b4f4b 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -22,6 +22,10 @@ using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; using JiShe.CollectBus.Analyzers.Shared; using JiShe.CollectBus.IoTDB.Exceptions; +using System.Diagnostics.Metrics; +using Newtonsoft.Json.Linq; +using static System.Runtime.InteropServices.JavaScript.JSType; +using System.Text.RegularExpressions; namespace JiShe.CollectBus.IoTDB.Provider { @@ -68,8 +72,13 @@ namespace JiShe.CollectBus.IoTDB.Provider var metadata = await GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata); + if (tablet == null || tablet.Count <= 0) + { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); + return; + } - await CurrentSession.InsertAsync(tablet); + await CurrentSession.InsertAsync(tablet.First()); } catch (Exception ex) { @@ -95,7 +104,15 @@ namespace JiShe.CollectBus.IoTDB.Provider foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); - await CurrentSession.InsertAsync(tablet); + if (tablet == null || tablet.Count <= 0) + { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); + return; + } + foreach (var item in tablet) + { + await CurrentSession.InsertAsync(item); + } } } catch (Exception ex) @@ -123,7 +140,15 @@ namespace JiShe.CollectBus.IoTDB.Provider foreach (var batch in batches) { var tablet = BuildTablet(batch, deviceMetadata); - await CurrentSession.InsertAsync(tablet); + if (tablet == null || tablet.Count <= 0) + { + _logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null"); + return; + } + foreach (var item in tablet) + { + await CurrentSession.InsertAsync(item); + } } } catch (Exception ex) @@ -181,22 +206,54 @@ namespace JiShe.CollectBus.IoTDB.Provider var accessor = SourceEntityAccessorFactory.GetAccessor(); var columns = CollectColumnMetadata(accessor); - var metadata = BuildDeviceMetadata(columns); + var tmpMetadata = BuildDeviceMetadata(columns, accessor); + + string tableNameOrTreePath = string.Empty; + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + if (tableNameOrTreePathAttribute != null) + { + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + } + tmpMetadata.EntityName = accessor.EntityName; + tmpMetadata.EntityType = accessor.EntityType; + tmpMetadata.TableNameOrTreePath = tableNameOrTreePath; + var metaData = MetadataCache.AddOrUpdate( typeof(T), - addValueFactory: t => metadata, // 如果键不存在,用此值添加 + addValueFactory: t => tmpMetadata, // 如果键不存在,用此值添加 updateValueFactory: (t, existingValue) => { var columns = CollectColumnMetadata(accessor); - var metadata = BuildDeviceMetadata(columns); + var metadata = BuildDeviceMetadata(columns, accessor); //对现有值 existingValue 进行修改,返回新值 + string tableNameOrTreePath = string.Empty; + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + if (tableNameOrTreePathAttribute != null) + { + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + } existingValue.ColumnNames = metadata.ColumnNames; + existingValue.DataTypes = metadata.DataTypes; return existingValue; } ); - metadata.EntityType = accessor.EntityType; + //var metaData = MetadataCache.GetOrAdd(typeof(T), type => + //{ + // var columns = CollectColumnMetadata(accessor); + // var metadata = BuildDeviceMetadata(columns, accessor); + // string tableNameOrTreePath = string.Empty; + // var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + // if (tableNameOrTreePathAttribute != null) + // { + // tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + // } + // metadata.EntityName = accessor.EntityName; + // metadata.EntityType = accessor.EntityType; + // metadata.TableNameOrTreePath = tableNameOrTreePath; + // return metadata; + //}); return await Task.FromResult(metaData); } @@ -251,338 +308,110 @@ namespace JiShe.CollectBus.IoTDB.Provider } } - ///// - ///// 构建Tablet - ///// - ///// - ///// 表实体 - ///// 设备元数据 - ///// - //private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity - //{ - // var timestamps = new List(); - // var values = new List>(); - // var devicePaths = new HashSet(); - // List tempColumnNames = new List(); - // tempColumnNames.AddRange(metadata.ColumnNames); - - // var accessor = SourceEntityAccessorFactory.GetAccessor(); - - // var memberCache = new Dictionary(); // 缓存优化查询 - // // 预构建成员缓存(Key: NameOrPath) - // foreach (var member in accessor.MemberList) - // { - // memberCache[member.NameOrPath] = member; - // } - - // if (accessor.EntityType == null || metadata.EntityType == null) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); - // } - - // if (metadata.EntityType != accessor.EntityType) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); - // } - - // if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); - // } - // else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); - // } - - // string tableNameOrTreePath = string.Empty; - // var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); - // if (tableNameOrTreePathAttribute != null) - // { - // tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; - // } - - // foreach (var entity in entities) - // { - // timestamps.Add(entity.Timestamps); - // var rowValues = new List(); - - // foreach (var measurement in metadata.ColumnNames) - // { - // if (!memberCache.TryGetValue(measurement, out var member)) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName}没有找到{measurement}对应的member信息,-105"); - // } - - // var value = member.GetValue(entity); - - // // 特性查询优化 - // var attributes = member.CustomAttributes ?? Enumerable.Empty(); - // var singleMeasuringAttr = attributes.OfType().FirstOrDefault(); - // if (singleMeasuringAttr != null)//如果是单侧点 - // { - - // var tupleItem1Key = $"{member.NameOrPath}.Item1"; - // if (!memberCache.TryGetValue(tupleItem1Key, out var tuple1Member)) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item1 信息,-106"); - // } - // int indexOf = metadata.ColumnNames.IndexOf(measurement); - // tempColumnNames[indexOf] = (string)tuple1Member.GetValue(entity); - - // var tupleItem2Key = $"{member.NameOrPath}.Item2"; - // if (!memberCache.TryGetValue(tupleItem2Key, out var tuple2Member)) - // { - // throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item2 信息,-107"); - // } - - // value = tuple2Member.GetValue(entity); - // } - - // if (value != null) - // { - // var tempValue = member.DeclaredTypeName.ToUpper() switch - // { - // "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), - // _ => value - // }; - - // rowValues.Add(tempValue); - // } - // else - // { - // rowValues.Add(value); - // } - // } - - // values.Add(rowValues); - - // //如果指定了路径 - // if (!string.IsNullOrWhiteSpace(tableNameOrTreePath)) - // { - // devicePaths.Add(tableNameOrTreePath); - // } - // else - // { - // if (!_runtimeContext.UseTableSessionPool)//树模型 - // { - // devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); - // } - // else - // { - // devicePaths.Add(DevicePathBuilder.GetTableName()); - // } - // } - // } - - // if (devicePaths.Count > 1) - // { - // throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。"); - // } - - // return _runtimeContext.UseTableSessionPool - // ? BuildTableSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps) - // : BuildSessionTablet(metadata, devicePaths.First(), tempColumnNames,values, timestamps); - //} - /// /// 构建Tablet /// /// - /// 表实体集合 + /// 表实体 /// 设备元数据 /// - private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity + private List BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity { - // 前置校验 - ValidateMetadataAndAccessor(metadata, out var accessor); - - // 初始化数据结构 - var (timestamps, values, devicePaths) = (new List(), new List>(), new HashSet()); - var tempColumnNames = new List(metadata.ColumnNames); - var memberCache = BuildMemberCache(accessor); - var tableNameOrTreePath = GetTableNameOrTreePath(); - - // 处理每个实体 - foreach (var entity in entities) + var entitiyList = entities.ToList(); + if (entitiyList == null || entitiyList.Count <= 0) { - ProcessEntity(entity, accessor, metadata, memberCache, tempColumnNames, timestamps, values); - UpdateDevicePaths(entity, tableNameOrTreePath, devicePaths); + return null; + } + + //var accessor = SourceEntityAccessorFactory.GetAccessor(); + + //var memberCache = BuildMemberCache(accessor); + + if (metadata.EntityType == null) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); + } + + if (metadata.EntityType == EntityTypeEnum.Other) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 不属于IoTDB数据模型实体,属于异常情况,-102"); + } + + if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); + } + else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); + } + string tableNameOrTreePath = string.Empty; + if (_runtimeContext.UseTableSessionPool)//表模型 + { + //如果指定了路径 + if (!string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath)) + { + tableNameOrTreePath = metadata.TableNameOrTreePath; + } + else + { + tableNameOrTreePath = DevicePathBuilder.GetTableName(); + } + + return new List() { BuildTablet(entitiyList, metadata, tableNameOrTreePath) }; + } + else + { + //树模型的时候,实体的设备Id可能会不同,因此需要根据不同路径进行存储。 + var tabletList = new List(); + var groupEntities = entitiyList.GroupBy(d => d.DevicePath).ToList(); + foreach (var group in groupEntities) + { + tabletList.Add(BuildTablet(group.ToList(), metadata, group.Key)); + } + + return tabletList; + } + } + + private Tablet BuildTablet(List entities, DeviceMetadata metadata, string tableNameOrTreePath) where T : IoTEntity + { + // 预分配内存结构 + var rowCount = entities.Count; + var timestamps = new long[rowCount]; + var values = new object[rowCount][]; + for (var i = 0; i < values.Length; i++) + { + values[i] = new object[metadata.ColumnNames.Count]; + } + + List tempColumnNames = new List(); + tempColumnNames.AddRange(metadata.ColumnNames); + + // 顺序处理数据(保证线程安全) + for (var row = 0; row < rowCount; row++) + { + var entity = entities[row]; + timestamps[row] = entity.Timestamps; + + for (int i = 0; i < metadata.ColumnNames.Count; i++) + { + var processor = metadata.Processors[i]; + if (processor.IsSingleMeasuring) + { + tempColumnNames[i] = (string)processor.SingleMeasuringNameGetter(entity); + } + + // 获取并转换值 + values[row][i] = processor.ValueGetter(entity); + } } - // 后置校验与返回 - ValidateDevicePaths(devicePaths); - // return CreateFinalTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps); return _runtimeContext.UseTableSessionPool - ? BuildTableSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps) - : BuildSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps); + ? BuildTableSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()) + : BuildSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList()); } - private void ValidateMetadataAndAccessor(DeviceMetadata metadata, out ISourceEntityAccessor accessor) where T : IoTEntity - { - accessor = SourceEntityAccessorFactory.GetAccessor(); - - if (accessor.EntityType == null || metadata.EntityType == null) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,EntityType未指定", -101); - } - - if (metadata.EntityType != accessor.EntityType) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,EntityType不一致", -102); - } - - bool isTableModel = accessor.EntityType == EntityTypeEnum.TableModel; - if (_runtimeContext.UseTableSessionPool != isTableModel) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,Session类型不匹配: 预期{(isTableModel ? "Table" : "Tree")}模型", isTableModel ? -104 : -103); - } - } - - /// - /// 处理实体并获取值 - /// - /// - /// - /// - /// - /// - /// - /// - /// - /// - private void ProcessEntity( - T entity, - ISourceEntityAccessor accessor, - DeviceMetadata metadata, - Dictionary memberCache, - List tempColumnNames, - List timestamps, - List> values) where T : IoTEntity - { - timestamps.Add(entity.Timestamps); - var rowValues = new object[metadata.ColumnNames.Count]; - - Parallel.ForEach(metadata.ColumnNames, (measurement, state, index) => - { - if (!memberCache.TryGetValue(measurement, out var member)) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,找不到成员: {measurement}", -105); - } - - object value = ResolveMemberValue(entity, member, memberCache, tempColumnNames, (int)index); - rowValues[index] = ConvertValueByType(member, value); - }); - - values.Add(rowValues.ToList()); - } - - private object ResolveMemberValue( - T entity, - EntityMemberInfo member, - Dictionary memberCache, - List tempColumnNames, - int columnIndex) where T : IoTEntity - { - // 单测点逻辑 - if (member.CustomAttributes?.OfType().FirstOrDefault() is { } attr) - { - var tuple1Key = $"{member.NameOrPath}.Item1"; - var tuple2Key = $"{member.NameOrPath}.Item2"; - - if (!memberCache.TryGetValue(tuple1Key, out var tuple1) || !memberCache.TryGetValue(tuple2Key, out var tuple2)) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,单侧点元组成员缺失", -106); - } - - tempColumnNames[columnIndex] = (string)tuple1.GetValue(entity); - return tuple2.GetValue(entity); - } - return member.GetValue(entity); - } - - /// - /// 设置实体的成员值 - /// - /// - /// - /// - private object ConvertValueByType(EntityMemberInfo member, object value) - { - return member.DeclaredTypeName switch - { - "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), - _ => value - }; - } - - /// - /// 处理设备路径 - /// - /// - /// - /// - /// - private void UpdateDevicePaths( - T entity, - string tableNameOrTreePath, - HashSet devicePaths) where T : IoTEntity - { - if (!string.IsNullOrEmpty(tableNameOrTreePath)) - { - devicePaths.Add(tableNameOrTreePath); - return; - } - - var path = _runtimeContext.UseTableSessionPool - ? DevicePathBuilder.GetTableName() - : DevicePathBuilder.GetDevicePath(entity); - devicePaths.Add(path); - } - - /// - /// 验证设备路径 - /// - /// - private void ValidateDevicePaths(HashSet devicePaths) - { - if (devicePaths.Count == 0) - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,设备路径集合为空", -108); - } - - if (devicePaths.Count > 1) - { - var paths = string.Join(", ", devicePaths.Take(3)); - { - throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时,设备路径不一致。检测到路径: {paths}...", -109); - } - } - } - - /// - /// 缓存优化:避免重复反射 - /// - /// - /// - private string GetTableNameOrTreePath() - { - return AttributeCache.TableNameOrTreePath; - } - - /// - /// 特性缓存辅助类 - /// - /// - private static class AttributeCache - { - public static readonly string TableNameOrTreePath; - - static AttributeCache() - { - var attr = typeof(T).GetCustomAttribute(); - TableNameOrTreePath = attr?.TableNameOrTreePath ?? string.Empty; - } - } /// /// 构建tree模型的Tablet @@ -615,7 +444,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 数据集合 /// 时间戳集合 /// - private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List columns,List> values, List timestamps) + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List columns, List> values, List timestamps) { var tablet = new Tablet( tableName, @@ -802,7 +631,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private List CollectColumnMetadata(ISourceEntityAccessor accessor) { var columns = new List(); - var memberCache = BuildMemberCache(accessor); + var memberCache = BuildMemberCache(accessor); foreach (var member in accessor.MemberList) { @@ -825,15 +654,15 @@ namespace JiShe.CollectBus.IoTDB.Provider ColumnInfo? column = null; if (tagAttr != null) { - column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName); } else if (attrColumn != null) { - column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName); } else if (fieldColumn != null) { - column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName); } // 单测模式处理 @@ -844,7 +673,7 @@ namespace JiShe.CollectBus.IoTDB.Provider { throw new Exception($"{nameof(CollectColumnMetadata)} {accessor.EntityName} {member.NameOrPath} 单侧点属性解析异常"); } - column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true); + column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true, tupleMember.DeclaredTypeName); } if (column.HasValue) columns.Add(column.Value); @@ -858,7 +687,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 待解析的类 /// 已处理好的数据列 /// - private DeviceMetadata BuildDeviceMetadata(List columns) where T : IoTEntity + private DeviceMetadata BuildDeviceMetadata(List columns, ISourceEntityAccessor accessor) where T : IoTEntity { var metadata = new DeviceMetadata(); @@ -877,9 +706,97 @@ namespace JiShe.CollectBus.IoTDB.Provider ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); + // 新增处理器初始化 + foreach (var item in metadata.ColumnNames) + { + ColumnInfo column = columns.FirstOrDefault(d => d.Name == item); + + var processor = new ColumnProcessor + { + ColumnName = column.Name, + IsSingleMeasuring = column.IsSingleMeasuring, + Converter = CreateConverter(column.DeclaredTypeName.ToUpper()) + }; + + // 处理单测点 + if (column.IsSingleMeasuring) + { + var item1Member = accessor.MemberList + .First(m => m.NameOrPath == $"{column.Name}.Item1"); + + processor.SingleMeasuringNameGetter = (obj) => + { + // 获取原始值并转为字符串 + object rawValue = item1Member.Getter(obj); + string value = rawValue?.ToString(); + + if (!string.IsNullOrEmpty(value)) + { + // 规则1: 严格检查ASCII字母和数字(0-9, A-Z, a-z) + bool hasInvalidChars = value.Any(c => + !((c >= 'A' && c <= 'Z') || + (c >= 'a' && c <= 'z') || + (c >= '0' && c <= '9'))); + + // 规则2: 首字符不能是数字 + bool startsWithDigit = value[0] >= '0' && value[0] <= '9'; + + // 规则3: 全字符串不能都是数字 + bool allDigits = value.All(c => c >= '0' && c <= '9'); + + // 按优先级抛出具体异常 + if (hasInvalidChars) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字"); + } + else if (startsWithDigit) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 不能以数字开头"); + } + else if (allDigits) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 不能全为数字"); + } + } + + return value; + }; + + var item2Member = accessor.MemberList + .First(m => m.NameOrPath == $"{column.Name}.Item2"); + processor.ValueGetter = (obj) => { + object rawValue = item2Member.Getter(obj); + return processor.Converter(rawValue); + }; + } + else + { + // 获取对应的成员访问器 + var member = accessor.MemberList.First(m => m.NameOrPath == column.Name); + processor.ValueGetter = (obj) => { + object rawValue = member.Getter(obj); + return processor.Converter(rawValue); + }; + } + + metadata.Processors.Add(processor); + } + return metadata; } + private Func CreateConverter(string declaredTypeName) + { + return declaredTypeName switch + { + "DATETIME" => value => ((DateTime)value).GetDateTimeOffset().ToUnixTimeNanoseconds(), + _ => value => value + }; + } + /// /// 处理不同列类型的逻辑 /// @@ -906,6 +823,11 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public string Name { get; } + /// + /// 声明的类型的名称 + /// + public string DeclaredTypeName { get; } + /// /// 是否是单测点 /// @@ -921,12 +843,13 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public TSDataType DataType { get; } - public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring) + public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring, string declaredTypeName) { Name = name; Category = category; DataType = dataType; IsSingleMeasuring = isSingleMeasuring; + DeclaredTypeName = declaredTypeName; } } @@ -1023,5 +946,7 @@ namespace JiShe.CollectBus.IoTDB.Provider } return cache; } + + private static readonly Regex _asciiAlphanumericRegex = new Regex(@"^[a-zA-Z0-9]*$", RegexOptions.Compiled); } } diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index d6f9e04..c24ad18 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -140,13 +140,13 @@ namespace JiShe.CollectBus.DataChannels // 批量写入数据库 await _dbProvider.BatchInsertAsync(metadata, records); - //// 限流推送Kafka - //await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - // items: records, - // deviceIdSelector: data => data.DeviceId, - // processor: async (data, groupIndex) => - // await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) - //); + // 限流推送Kafka + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: records, + deviceIdSelector: data => data.DeviceId, + processor: async (data, groupIndex) => + await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) + ); } catch (Exception ex) { diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 843c6f7..0009e20 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -184,14 +184,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS time = DateTime.Now; //System.Reflection.PropertyInfo; //System.Reflection.FieldInfo - var meter = new TreeModelSingleMeasuringEntity() + var meter = new TreeModelSingleMeasuringEntity() { SystemName = "energy", DeviceId = "402440506", DeviceType = "1", ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), - SingleMeasuring = (measuring, value) + SingleMeasuring = (measuring, time) }; await _iotDBProvider.InsertAsync(meter); } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index bab879d..9d45f08 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -322,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - //return; + //return; // 创建取消令牌源 //var cts = new CancellationTokenSource(); @@ -1635,36 +1635,43 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType) { - string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); - return new MeterReadingTelemetryPacketInfo() + try { - SystemName = SystemType, - ProjectId = $"{ammeterInfo.ProjectID}", - DeviceType = $"{MeterTypeEnum.Ammeter}", - DeviceId = $"{ammeterInfo.MeterId}", - Timestamps = timestamps, - DatabaseBusiID = ammeterInfo.DatabaseBusiID, - PendingCopyReadTime = pendingCopyReadTime, - CreationTime = creationTime, - MeterAddress = ammeterInfo.AmmerterAddress, - PacketType = (int)packetType, - AFN = builderResponse.AFn, - Fn = builderResponse.Fn, - Seq = builderResponse.Seq, - MSA = builderResponse.MSA, - FocusId = ammeterInfo.FocusId, - FocusAddress = ammeterInfo.FocusAddress, - ItemCode = itemCode, - SubItemCode = subItemCode, - TaskMark = taskMark, - IsSend = false, - ManualOrNot = false, - Pn = ammeterInfo.MeteringCode, - IssuedMessageId = GuidGenerator.Create().ToString(), - IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), - IsReceived = false, - ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), - }; + string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); + return new MeterReadingTelemetryPacketInfo() + { + SystemName = SystemType, + ProjectId = $"{ammeterInfo.ProjectID}", + DeviceType = $"{MeterTypeEnum.Ammeter}", + DeviceId = $"{ammeterInfo.MeterId}", + Timestamps = timestamps, + DatabaseBusiID = ammeterInfo.DatabaseBusiID, + PendingCopyReadTime = pendingCopyReadTime, + CreationTime = creationTime, + MeterAddress = ammeterInfo.AmmerterAddress, + PacketType = (int)packetType, + AFN = builderResponse.AFn, + Fn = builderResponse.Fn, + Seq = builderResponse.Seq, + MSA = builderResponse.MSA, + FocusId = ammeterInfo.FocusId, + FocusAddress = ammeterInfo.FocusAddress, + ItemCode = itemCode, + SubItemCode = subItemCode, + TaskMark = taskMark, + IsSend = false, + ManualOrNot = false, + Pn = ammeterInfo.MeteringCode, + IssuedMessageId = Guid.NewGuid().ToString(), + IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), + IsReceived = false, + ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), + }; + } + catch (Exception ex) + { + throw ex; + } } #endregion