From cc59b327426819dcf3c00618e4a96d326123cd29 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 21 Apr 2025 09:45:30 +0800 Subject: [PATCH 01/14] =?UTF-8?q?=E5=AE=8C=E5=96=84IoTDB=E7=9A=84=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=AE=9E=E4=BD=93=E7=BA=A6=E6=9D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Attribute/EntityTypeAttribute.cs | 19 ++ .../Context/IoTDBRuntimeContext.cs | 2 +- .../EnumInfo/EntityTypeEnum.cs | 24 ++ .../Provider/DeviceMetadata.cs | 6 + .../Provider/DevicePathBuilder.cs | 2 +- .../Provider/IoTDBProvider.cs | 210 +++++++++++------- .../Provider/IoTEntity.cs | 10 +- .../RedisDataCache/RedisDataCacheService.cs | 2 + .../Samples/SampleAppService.cs | 3 +- .../BasicScheduledMeterReadingService.cs | 2 - .../Ammeters/ElectricityMeter.cs | 3 + 11 files changed, 192 insertions(+), 91 deletions(-) create mode 100644 modules/JiShe.CollectBus.IoTDB/Attribute/EntityTypeAttribute.cs create mode 100644 modules/JiShe.CollectBus.IoTDB/EnumInfo/EntityTypeEnum.cs diff --git a/modules/JiShe.CollectBus.IoTDB/Attribute/EntityTypeAttribute.cs b/modules/JiShe.CollectBus.IoTDB/Attribute/EntityTypeAttribute.cs new file mode 100644 index 0000000..3610c00 --- /dev/null +++ b/modules/JiShe.CollectBus.IoTDB/Attribute/EntityTypeAttribute.cs @@ -0,0 +1,19 @@ +using JiShe.CollectBus.IoTDB.Enums; + +namespace JiShe.CollectBus.IoTDB.Attribute +{ + /// + /// IoTDB实体类型特性 + /// + [AttributeUsage(AttributeTargets.Class)] + public class EntityTypeAttribute : System.Attribute + { + public EntityTypeEnum EntityType { get; } + + + public EntityTypeAttribute(EntityTypeEnum entityType) + { + EntityType = entityType; + } + } +} diff --git a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs index cd99b00..f321fa0 100644 --- a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs +++ b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs @@ -17,7 +17,7 @@ namespace JiShe.CollectBus.IoTDB.Context } /// - /// 是否使用表模型存储, 默认false,使用tree模型存储 + /// 存储模型切换标识,是否使用table模型存储, 默认为false,标识tree模型存储 /// public bool UseTableSessionPool { get; set; } diff --git a/modules/JiShe.CollectBus.IoTDB/EnumInfo/EntityTypeEnum.cs b/modules/JiShe.CollectBus.IoTDB/EnumInfo/EntityTypeEnum.cs new file mode 100644 index 0000000..26c6645 --- /dev/null +++ b/modules/JiShe.CollectBus.IoTDB/EnumInfo/EntityTypeEnum.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDB.Enums +{ + /// + /// IoTDB实体类型枚举 + /// + public enum EntityTypeEnum + { + /// + /// 树模型 + /// + TreeModel = 1, + + /// + /// 表模型 + /// + TableModel = 2, + } +} diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs index 447f6ce..a093bb7 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs @@ -1,4 +1,5 @@ using Apache.IoTDB; +using JiShe.CollectBus.IoTDB.Enums; namespace JiShe.CollectBus.IoTDB.Provider { @@ -7,6 +8,11 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public class DeviceMetadata { + /// + /// IoTDB实体类型枚举 + /// + public EntityTypeEnum EntityType { get; set; } + /// /// 是否有单测量值 /// diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index 46ce091..b78dfa3 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -13,7 +13,7 @@ /// public static string GetDevicePath(T entity) where T : IoTEntity { - return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceId}`"; + return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 9e18ac8..468506a 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -1,4 +1,5 @@ -using System.Collections.Concurrent; +using System; +using System.Collections.Concurrent; using System.Reflection; using System.Text; using Apache.IoTDB; @@ -9,6 +10,7 @@ using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Logging; +using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IoTDB.Provider { @@ -139,66 +141,98 @@ namespace JiShe.CollectBus.IoTDB.Provider List tempColumnNames = new List(); tempColumnNames.AddRange(metadata.ColumnNames); - foreach (var entity in entities) + var entityTypeAttribute = typeof(T).GetCustomAttribute(); + + if (entityTypeAttribute == null) { - timestamps.Add(entity.Timestamps); - var rowValues = new List(); - foreach (var measurement in tempColumnNames) + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); + } + + if (metadata.EntityType != entityTypeAttribute.EntityType) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); + } + + if(metadata.EntityType == Enums.EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); + } + else if (metadata.EntityType == Enums.EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); + } + + + foreach (var entity in entities) { + timestamps.Add(entity.Timestamps); + var rowValues = new List(); - PropertyInfo propertyInfo = typeof(T).GetProperty(measurement); - if (propertyInfo == null) + foreach (var measurement in tempColumnNames) { - throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。"); - } - var value = propertyInfo.GetValue(entity); - if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && value != null)//表示当前对象是单测点模式 - { - Type tupleType = value.GetType(); - Type[] tupleArgs = tupleType.GetGenericArguments(); - Type item2Type = tupleArgs[1]; // T 的实际类型 - var item1 = tupleType.GetProperty("Item1")!.GetValue(value); - var item2 = tupleType.GetProperty("Item2")!.GetValue(value); - if (item1 == null || item2 == null) + PropertyInfo propertyInfo = typeof(T).GetProperty(measurement); + if (propertyInfo == null) { - throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。"); + throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。"); } - var indexOf = metadata.ColumnNames.IndexOf(measurement); - metadata.ColumnNames[indexOf] = (string)item1!; - - rowValues.Add(item2); - - } - else - { - if (value != null) + var value = propertyInfo.GetValue(entity); + if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && metadata.IsSingleMeasuring == true)//表示当前对象是单测点模式 { - rowValues.Add(value); + if (value != null) + { + Type tupleType = value.GetType(); + Type[] tupleArgs = tupleType.GetGenericArguments(); + Type item2Type = tupleArgs[1]; // T 的实际类型 + var item1 = tupleType.GetProperty("Item1")!.GetValue(value); + var item2 = tupleType.GetProperty("Item2")!.GetValue(value); + if (item1 == null || item2 == null) + { + throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。"); + } + + var indexOf = metadata.ColumnNames.IndexOf(measurement); + metadata.ColumnNames[indexOf] = (string)item1!; + + rowValues.Add(item2); + } + else + { + rowValues.Add(null); + } } else { - //填充默认数据值 - DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue); - rowValues.Add(defaultValue); + rowValues.Add(value); + + //if (value != null) + //{ + // rowValues.Add(value); + //} + //else + //{ + // ////填充默认数据值 + // //DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue); + + // rowValues.Add(null); + //} } + } - } + values.Add(rowValues); - values.Add(rowValues); - - if (!_runtimeContext.UseTableSessionPool)//树模型 - { - devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); + if (!_runtimeContext.UseTableSessionPool)//树模型 + { + devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); + } + else + { + devicePaths.Add(DevicePathBuilder.GetTableName()); + } } - else - { - devicePaths.Add(DevicePathBuilder.GetTableName()); - } - } if (devicePaths.Count > 1) { @@ -213,14 +247,16 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 构建tree模型的Tablet /// - /// - /// - /// - /// + /// 已解析的设备数据元数据 + /// 设备路径 + /// 数据集合 + /// 时间戳集合 /// private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List> values, List timestamps) { + //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可 + return new Tablet( devicePath, metadata.ColumnNames, @@ -233,16 +269,16 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 构建表模型的Tablet /// - /// - /// - /// - /// + /// 已解析的设备数据元数据 + /// 表名称 + /// 数据集合 + /// 时间戳集合 /// - private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string devicePath, + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List> values, List timestamps) { var tablet = new Tablet( - devicePath, + tableName, metadata.ColumnNames, metadata.ColumnCategories, metadata.DataTypes, @@ -393,33 +429,35 @@ namespace JiShe.CollectBus.IoTDB.Provider private DeviceMetadata GetMetadata() where T : IoTEntity { - var columns = CollectColumnMetadata(typeof(T)); - var metadata = BuildDeviceMetadata(columns); - - return _metadataCache.AddOrUpdate( - typeof(T), - addValueFactory: t => metadata, // 如果键不存在,用此值添加 - updateValueFactory: (t, existingValue) => + if (_runtimeContext.UseTableSessionPool)//表模型 + { + return _metadataCache.GetOrAdd(typeof(T), type => { - var columns = CollectColumnMetadata(t); - var metadata = BuildDeviceMetadata(columns); - - //对现有值 existingValue 进行修改,返回新值 - existingValue.ColumnNames = metadata.ColumnNames; - return existingValue; - } - ); + var columns = CollectColumnMetadata(type); + var metadata = BuildDeviceMetadata(columns); + return metadata; + }); + } + else + { + // 树模型 + var columns = CollectColumnMetadata(typeof(T)); + var metadata = BuildDeviceMetadata(columns); - //return _metadataCache.GetOrAdd(typeof(T), type => - //{ - // var columns = CollectColumnMetadata(type); - // var metadata = BuildDeviceMetadata(columns); - // //if (metadata.IsSingleMeasuring) - // //{ - // // _metadataCache.Remove(typeof(T)); - // //} - // return metadata; - //}); + return _metadataCache.AddOrUpdate( + typeof(T), + addValueFactory: t => metadata, // 如果键不存在,用此值添加 + updateValueFactory: (t, existingValue) => + { + var columns = CollectColumnMetadata(t); + var metadata = BuildDeviceMetadata(columns); + + //对现有值 existingValue 进行修改,返回新值 + existingValue.ColumnNames = metadata.ColumnNames; + return existingValue; + } + ); + } } /// @@ -483,9 +521,10 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 构建设备元数据 /// - /// + /// 待解析的类 + /// 已处理好的数据列 /// - private DeviceMetadata BuildDeviceMetadata(List columns) + private DeviceMetadata BuildDeviceMetadata(List columns) where T : IoTEntity { var metadata = new DeviceMetadata(); @@ -499,11 +538,20 @@ namespace JiShe.CollectBus.IoTDB.Provider var groupedColumns = columns .GroupBy(c => c.Category) .ToDictionary(g => g.Key, g => g.ToList()); - + ProcessCategory(groupedColumns, ColumnCategory.TAG, metadata); ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); + var entityTypeAttribute = typeof(T).GetCustomAttribute(); + + if (entityTypeAttribute == null) + { + throw new ArgumentException($"{nameof(BuildDeviceMetadata)} 构建设备元数据时 {nameof(IoTEntity)}的EntityType 没有指定,属于异常情况,-101"); + } + + metadata.EntityType = entityTypeAttribute.EntityType; + return metadata; } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs index 39a584d..fc965e6 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs @@ -11,29 +11,29 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 系统名称 /// [TAGColumn] - public string SystemName { get; set; } + public required string SystemName { get; set; } /// /// 项目编码 /// [TAGColumn] - public string ProjectCode { get; set; } + public required string ProjectCode { get; set; } /// /// 设备类型集中器、电表、水表、流量计、传感器等 /// [TAGColumn] - public string DeviceType { get; set; } + public required string DeviceType { get; set; } /// /// 设备ID /// [TAGColumn] - public string DeviceId { get; set; } + public required string DeviceId { get; set; } /// /// 当前时间戳,单位毫秒 /// - public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } } diff --git a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index 1a056a7..625c1e6 100644 --- a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -42,6 +42,8 @@ namespace JiShe.CollectBus.RedisDataCache Instance = _freeRedisProvider.Instance; } + //todo 单个数据查询 + /// /// 单个添加数据 /// diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 23cfdfb..4970b7f 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -105,6 +105,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS await _iotDBProvider.InsertAsync(meter2); _dbContext.UseTableSessionPool = true; + var testTime = Convert.ToDateTime("2025-04-21 08:32:55"); ElectricityMeter meter = new ElectricityMeter() { @@ -115,7 +116,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS MeterModel = "DDZY-1980", ProjectCode = "10059", Voltage = 10, - Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), }; await _iotDBProvider.InsertAsync(meter); } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index cd06eea..3c2b652 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -27,8 +27,6 @@ using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; -using static FreeSql.Internal.GlobalFilter; -using static System.Runtime.InteropServices.JavaScript.JSType; namespace JiShe.CollectBus.ScheduledMeterReading { diff --git a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs index 2d9cc67..a6c09dc 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs +++ b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs @@ -33,5 +33,8 @@ namespace JiShe.CollectBus.Ammeters [FIELDColumn] public double Power => Voltage * Current; + + [FIELDColumn] + public double? Currentd { get; set; } } } From 38283db1b2f659d203a8408d1ba46c86f3272149 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 21 Apr 2025 10:50:36 +0800 Subject: [PATCH 02/14] =?UTF-8?q?=E6=B5=8B=E8=AF=95IoTDB=E5=AE=9E=E4=BD=93?= =?UTF-8?q?=E7=BA=A6=E6=9D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Provider/IoTDBProvider.cs | 2 +- .../Samples/SampleAppService.cs | 23 ++++++++-- .../Ammeters/ElectricityMeter.cs | 2 + .../Ammeters/ElectricityMeterTreeModel.cs | 42 +++++++++++++++++++ 4 files changed, 64 insertions(+), 5 deletions(-) create mode 100644 services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeterTreeModel.cs diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 468506a..5b4e2a7 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -547,7 +547,7 @@ namespace JiShe.CollectBus.IoTDB.Provider if (entityTypeAttribute == null) { - throw new ArgumentException($"{nameof(BuildDeviceMetadata)} 构建设备元数据时 {nameof(IoTEntity)}的EntityType 没有指定,属于异常情况,-101"); + throw new ArgumentException($"{nameof(BuildDeviceMetadata)} 构建设备元数据时 {nameof(IoTEntity)} 的EntityType 没有指定,属于异常情况,-101"); } metadata.EntityType = entityTypeAttribute.EntityType; diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index e1080af..2988b96 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -68,7 +68,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS messageHexString = messageHexString + timestamps; } - ElectricityMeter meter = new ElectricityMeter() + ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel() { SystemName = "energy", DeviceId = "402440506", @@ -90,7 +90,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS [HttpGet] public async Task UseTableSessionPool() { - ElectricityMeter meter2 = new ElectricityMeter() + var testTime = Convert.ToDateTime("2025-04-21 08:35:55"); + + ElectricityMeterTreeModel meter2 = new ElectricityMeterTreeModel() { SystemName = "energy", DeviceId = "402440506", @@ -99,13 +101,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS MeterModel = "DDZY-1980", ProjectCode = "10059", Voltage = 10, - Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), }; await _iotDBProvider.InsertAsync(meter2); _dbContext.UseTableSessionPool = true; - var testTime = Convert.ToDateTime("2025-04-21 08:32:55"); ElectricityMeter meter = new ElectricityMeter() { @@ -119,6 +120,20 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), }; await _iotDBProvider.InsertAsync(meter); + + ElectricityMeter meter3 = new ElectricityMeter() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + Current = 10, + MeterModel = "DDZY-1980", + ProjectCode = "10059", + Voltage = 10, + Currentd = 22, + Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), + }; + await _iotDBProvider.InsertAsync(meter); } /// diff --git a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs index a6c09dc..cf936fe 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs +++ b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs @@ -4,10 +4,12 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using JiShe.CollectBus.IoTDB.Attribute; +using JiShe.CollectBus.IoTDB.Enums; using JiShe.CollectBus.IoTDB.Provider; namespace JiShe.CollectBus.Ammeters { + [EntityType(EntityTypeEnum.TableModel)] public class ElectricityMeter : IoTEntity { [ATTRIBUTEColumn] diff --git a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeterTreeModel.cs b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeterTreeModel.cs new file mode 100644 index 0000000..ddc7daa --- /dev/null +++ b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeterTreeModel.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using JiShe.CollectBus.IoTDB.Attribute; +using JiShe.CollectBus.IoTDB.Enums; +using JiShe.CollectBus.IoTDB.Provider; + +namespace JiShe.CollectBus.Ammeters +{ + [EntityType(EntityTypeEnum.TreeModel)] + public class ElectricityMeterTreeModel : IoTEntity + { + [ATTRIBUTEColumn] + public string MeterModel { get; set; } + + /// + /// 下发消息内容 + /// + [FIELDColumn] + public string IssuedMessageHexString { get; set; } + + ///// + ///// 下发消息Id + ///// + //[FIELDColumn] + //public string IssuedMessageId { get; set; } + + [FIELDColumn] + public double Voltage { get; set; } + + [FIELDColumn] + public double Current { get; set; } + + [FIELDColumn] + public double Power => Voltage * Current; + + [FIELDColumn] + public double? Currentd { get; set; } + } +} From ca1a23fd3303a3e8c21e65b20636ef449ce5da65 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 21 Apr 2025 14:20:49 +0800 Subject: [PATCH 03/14] =?UTF-8?q?=E4=BC=98=E5=8C=96IoTDB=E9=A9=B1=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Interface/IIoTDBProvider.cs | 2 +- .../JiShe.CollectBus.IoTDB.csproj | 1 - .../{Provider => Model}/IoTEntity.cs | 6 +- .../Model/SingleMeasuringEntity.cs | 10 +- .../Provider/DevicePathBuilder.cs | 4 +- .../Provider/IoTDBProvider.cs | 177 ++++++++---------- .../Provider/SessionPoolAdapter.cs | 2 +- .../Provider/TableSessionPoolAdapter.cs | 2 +- .../Samples/SampleAppService.cs | 167 ++++++++++------- .../Samples/TestAppService.cs | 3 +- .../Ammeters/ElectricityMeter.cs | 2 +- .../Ammeters/ElectricityMeterTreeModel.cs | 9 +- .../Extensions/DateTimeExtensions.cs | 17 ++ 13 files changed, 214 insertions(+), 188 deletions(-) rename modules/JiShe.CollectBus.IoTDB/{Provider => Model}/IoTEntity.cs (79%) rename services/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs => modules/JiShe.CollectBus.IoTDB/Model/SingleMeasuringEntity.cs (58%) diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs index 02ac3ee..b896bdf 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs @@ -1,6 +1,6 @@ using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; -using JiShe.CollectBus.IoTDB.Provider; namespace JiShe.CollectBus.IoTDB.Interface { diff --git a/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj b/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj index dc8f2fb..8cc4b33 100644 --- a/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj +++ b/modules/JiShe.CollectBus.IoTDB/JiShe.CollectBus.IoTDB.csproj @@ -6,7 +6,6 @@ enable - diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs similarity index 79% rename from modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs rename to modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index fc965e6..6431e6f 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -1,9 +1,9 @@ using JiShe.CollectBus.IoTDB.Attribute; -namespace JiShe.CollectBus.IoTDB.Provider +namespace JiShe.CollectBus.IoTDB.Model { /// - /// IoT实体基类 + /// IoT实体基类,此类适用于多个数据测点记录场景,单个测点请使用子类 SingleMeasuringEntity /// public abstract class IoTEntity { @@ -32,7 +32,7 @@ namespace JiShe.CollectBus.IoTDB.Provider public required string DeviceId { get; set; } /// - /// 当前时间戳,单位毫秒 + /// 当前时间戳,单位毫秒,必须通过DateTimeOffset获取 /// public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } diff --git a/services/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/SingleMeasuringEntity.cs similarity index 58% rename from services/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs rename to modules/JiShe.CollectBus.IoTDB/Model/SingleMeasuringEntity.cs index 0440c1d..72a8ae6 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/AFNEntity/SingleMeasuringAFNDataEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/SingleMeasuringEntity.cs @@ -4,19 +4,21 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using JiShe.CollectBus.IoTDB.Attribute; +using JiShe.CollectBus.IoTDB.Enums; using JiShe.CollectBus.IoTDB.Provider; -namespace JiShe.CollectBus.IotSystems.AFNEntity +namespace JiShe.CollectBus.IoTDB.Model { /// - /// AFN单项数据实体 + /// 单项数据实体 /// - public class SingleMeasuringAFNDataEntity : IoTEntity + [EntityType(EntityTypeEnum.TreeModel)] + public class SingleMeasuringEntity : IoTEntity { /// /// 单项数据对象 /// [SingleMeasuring(nameof(SingleMeasuring))] - public Tuple SingleMeasuring { get; set; } + public required Tuple SingleMeasuring { get; set; } } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index b78dfa3..a858c73 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -1,4 +1,6 @@ -namespace JiShe.CollectBus.IoTDB.Provider +using JiShe.CollectBus.IoTDB.Model; + +namespace JiShe.CollectBus.IoTDB.Provider { /// /// 设备路径构建器 diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index fdb9789..1fac82a 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -8,8 +8,10 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; +using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Logging; +using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IoTDB.Provider @@ -56,13 +58,7 @@ namespace JiShe.CollectBus.IoTDB.Provider var tablet = BuildTablet(new[] { entity }, metadata); - await CurrentSession.InsertAsync(tablet); - - //int result = await _currentSession.InsertAsync(tablet); - //if (result <= 0) - //{ - // _logger.LogError($"{typeof(T).Name}插入数据没有成功"); - //} + await CurrentSession.InsertAsync(tablet); } /// @@ -80,12 +76,7 @@ namespace JiShe.CollectBus.IoTDB.Provider foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); - await CurrentSession.InsertAsync(tablet); - //var result = await _currentSession.InsertAsync(tablet); - //if (result <= 0) - //{ - // _logger.LogWarning($"{typeof(T).Name} 批量插入数据第{batch}批次没有成功,共{batches}批次。"); - //} + await CurrentSession.InsertAsync(tablet); } } @@ -159,7 +150,7 @@ namespace JiShe.CollectBus.IoTDB.Provider throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); } - if(metadata.EntityType == Enums.EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) + if (metadata.EntityType == Enums.EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) { throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); } @@ -169,77 +160,65 @@ namespace JiShe.CollectBus.IoTDB.Provider } - foreach (var entity in entities) + foreach (var entity in entities) + { + timestamps.Add(entity.Timestamps); + var rowValues = new List(); + + foreach (var measurement in tempColumnNames) { - timestamps.Add(entity.Timestamps); - var rowValues = new List(); - foreach (var measurement in tempColumnNames) + PropertyInfo propertyInfo = typeof(T).GetProperty(measurement); + if (propertyInfo == null) { + throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。"); + } - PropertyInfo propertyInfo = typeof(T).GetProperty(measurement); - if (propertyInfo == null) + var value = propertyInfo.GetValue(entity); + if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && metadata.IsSingleMeasuring == true)//表示当前对象是单测点模式 + { + if (value != null) { - throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。"); - } - - var value = propertyInfo.GetValue(entity); - if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && metadata.IsSingleMeasuring == true)//表示当前对象是单测点模式 - { - if (value != null) + Type tupleType = value.GetType(); + Type[] tupleArgs = tupleType.GetGenericArguments(); + Type item2Type = tupleArgs[1]; // T 的实际类型 + var item1 = tupleType.GetProperty("Item1")!.GetValue(value); + var item2 = tupleType.GetProperty("Item2")!.GetValue(value); + if (item1 == null || item2 == null) { - Type tupleType = value.GetType(); - Type[] tupleArgs = tupleType.GetGenericArguments(); - Type item2Type = tupleArgs[1]; // T 的实际类型 - var item1 = tupleType.GetProperty("Item1")!.GetValue(value); - var item2 = tupleType.GetProperty("Item2")!.GetValue(value); - if (item1 == null || item2 == null) - { - throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。"); - } - - var indexOf = metadata.ColumnNames.IndexOf(measurement); - metadata.ColumnNames[indexOf] = (string)item1!; - - rowValues.Add(item2); - } - else - { - rowValues.Add(null); + throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。"); } + + var indexOf = metadata.ColumnNames.IndexOf(measurement); + metadata.ColumnNames[indexOf] = (string)item1!; + + rowValues.Add(item2); } else { - - rowValues.Add(value); - - //if (value != null) - //{ - // rowValues.Add(value); - //} - //else - //{ - // ////填充默认数据值 - // //DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue); - - // rowValues.Add(null); - //} + rowValues.Add(null); } - - } - - values.Add(rowValues); - - if (!_runtimeContext.UseTableSessionPool)//树模型 - { - devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); } else { - devicePaths.Add(DevicePathBuilder.GetTableName()); + + rowValues.Add(value); } + } + values.Add(rowValues); + + if (!_runtimeContext.UseTableSessionPool)//树模型 + { + devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); + } + else + { + devicePaths.Add(DevicePathBuilder.GetTableName()); + } + } + if (devicePaths.Count > 1) { throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。"); @@ -434,36 +413,21 @@ namespace JiShe.CollectBus.IoTDB.Provider /// private DeviceMetadata GetMetadata() where T : IoTEntity { - - if (_runtimeContext.UseTableSessionPool)//表模型 - { - return _metadataCache.GetOrAdd(typeof(T), type => + var columns = CollectColumnMetadata(typeof(T)); + var metadata = BuildDeviceMetadata(columns); + return MetadataCache.AddOrUpdate( + typeof(T), + addValueFactory: t => metadata, // 如果键不存在,用此值添加 + updateValueFactory: (t, existingValue) => { - var columns = CollectColumnMetadata(type); + var columns = CollectColumnMetadata(t); var metadata = BuildDeviceMetadata(columns); - return metadata; - }); - } - else - { - // 树模型 - var columns = CollectColumnMetadata(typeof(T)); - var metadata = BuildDeviceMetadata(columns); - return _metadataCache.AddOrUpdate( - typeof(T), - addValueFactory: t => metadata, // 如果键不存在,用此值添加 - updateValueFactory: (t, existingValue) => - { - var columns = CollectColumnMetadata(t); - var metadata = BuildDeviceMetadata(columns); - - //对现有值 existingValue 进行修改,返回新值 - existingValue.ColumnNames = metadata.ColumnNames; - return existingValue; - } - ); - } + //对现有值 existingValue 进行修改,返回新值 + existingValue.ColumnNames = metadata.ColumnNames; + return existingValue; + } + ); } /// @@ -477,21 +441,36 @@ namespace JiShe.CollectBus.IoTDB.Provider foreach (var prop in type.GetProperties()) { + + string typeName = string.Empty; + + Type declaredType = prop.PropertyType; + // 处理可空类型 + if (declaredType.IsGenericType && declaredType.GetGenericTypeDefinition() == typeof(Nullable<>)) + { + Type underlyingType = Nullable.GetUnderlyingType(declaredType); + typeName = underlyingType.Name; + } + else + { + typeName = declaredType.Name; + } + //先获取Tag标签和属性标签 ColumnInfo? column = prop.GetCustomAttribute() is not null ? new ColumnInfo( name: prop.Name, category: ColumnCategory.TAG, - dataType: GetDataTypeFromTypeName(prop.PropertyType.Name), + dataType: GetDataTypeFromTypeName(typeName), false ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.ATTRIBUTE, - GetDataTypeFromTypeName(prop.PropertyType.Name), + GetDataTypeFromTypeName(typeName), false ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.FIELD, - GetDataTypeFromTypeName(prop.PropertyType.Name), + GetDataTypeFromTypeName(typeName), false) : null; @@ -544,7 +523,7 @@ namespace JiShe.CollectBus.IoTDB.Provider var groupedColumns = columns .GroupBy(c => c.Category) .ToDictionary(g => g.Key, g => g.ToList()); - + ProcessCategory(groupedColumns, ColumnCategory.TAG, metadata); ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs index dd04f60..fcb0c02 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs @@ -56,7 +56,7 @@ namespace JiShe.CollectBus.IoTDB.Provider var result = await _sessionPool.InsertAlignedTabletAsync(tablet); if (result != 0) { - throw new Exception($"{nameof(TableSessionPoolAdapter)} "); + throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功,返回结果为:{result}"); } return result; diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs index be42ad7..d22f356 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs @@ -54,7 +54,7 @@ namespace JiShe.CollectBus.IoTDB.Provider var result = await _sessionPool.InsertAsync(tablet); if (result != 0) { - throw new Exception($"{nameof(TableSessionPoolAdapter)} "); + throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功,返回结果为:{result}"); } return result; diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index d729fe1..101ab0f 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -1,33 +1,27 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Apache.IoTDB.DataStructure; -using Apache.IoTDB; -using Confluent.Kafka; -using JiShe.CollectBus.Ammeters; -using JiShe.CollectBus.FreeSql; -using JiShe.CollectBus.IotSystems.PrepayModel; -using Microsoft.AspNetCore.Authorization; -using Microsoft.AspNetCore.Mvc; -using Microsoft.Extensions.Options; -using Microsoft.Extensions.Logging; -using JiShe.CollectBus.IotSystems.AFNEntity; -using JiShe.CollectBus.Protocol.Contracts.Interfaces; -using Microsoft.Extensions.DependencyInjection; -using JiShe.CollectBus.Common.Consts; -using JiShe.CollectBus.Common.Enums; -using System.Diagnostics.Metrics; -using JiShe.CollectBus.Common.DeviceBalanceControl; -using JiShe.CollectBus.Kafka.Attributes; -using System.Text.Json; +using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Application.Contracts; -using JiShe.CollectBus.Common.Models; -using System.Diagnostics; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.DeviceBalanceControl; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; +using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.IotSystems.PrepayModel; +using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; -using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; namespace JiShe.CollectBus.Samples; @@ -52,21 +46,11 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS /// /// 测试 UseSessionPool /// - /// + /// /// [HttpGet] - public async Task UseSessionPool(long timestamps) - { - string? messageHexString = null; - if (timestamps == 0) - { - timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - _logger.LogError($"timestamps_{timestamps}"); - } - else - { - messageHexString = messageHexString + timestamps; - } + public async Task UseSessionPool(DateTime testTime) + { ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel() { @@ -77,8 +61,8 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS MeterModel = "DDZY-1980", ProjectCode = "10059", Voltage = 10, - IssuedMessageHexString = messageHexString, - Timestamps = timestamps, + IssuedMessageHexString = "messageHexString", + Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), }; await _iotDBProvider.InsertAsync(meter); } @@ -88,10 +72,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS /// /// [HttpGet] - public async Task UseTableSessionPool() + public async Task UseTableSessionPool(DateTime time) { - var testTime = Convert.ToDateTime("2025-04-21 08:35:55"); - + var testTime = time; ElectricityMeterTreeModel meter2 = new ElectricityMeterTreeModel() { SystemName = "energy", @@ -101,7 +84,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS MeterModel = "DDZY-1980", ProjectCode = "10059", Voltage = 10, - Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), + Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), }; await _iotDBProvider.InsertAsync(meter2); @@ -117,10 +100,39 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS MeterModel = "DDZY-1980", ProjectCode = "10059", Voltage = 10, - Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), + Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), }; await _iotDBProvider.InsertAsync(meter); + } + + /// + /// 测试Session切换3 + /// + /// + [HttpGet] + public async Task UseTableSessionPool3(DateTime time) + { + var testTime = time; + ElectricityMeterTreeModel meter2 = new ElectricityMeterTreeModel() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + Current = 10, + MeterModel = "DDZY-1980", + ProjectCode = "10059", + Voltage = 10, + IssuedMessageHexString = "dsdfsfd", + Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + + }; + + await _iotDBProvider.InsertAsync(meter2); + + _dbContext.UseTableSessionPool = true; + + ElectricityMeter meter3 = new ElectricityMeter() { SystemName = "energy", @@ -131,7 +143,48 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS ProjectCode = "10059", Voltage = 10, Currentd = 22, - Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), + IssuedMessageHexString = "dsdfsfd", + Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), + }; + await _iotDBProvider.InsertAsync(meter3); + } + + /// + /// 测试单个测点数据项 + /// + /// + /// + [HttpGet] + public async Task TestSingleMeasuringAFNData(string measuring, string value, DateTime time) + { + var meter = new SingleMeasuringEntity() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + ProjectCode = "10059", + Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), + SingleMeasuring = new Tuple(measuring, value) + }; + await _iotDBProvider.InsertAsync(meter); + } + + /// + /// 测试单个测点数据项2 + /// + /// + /// + [HttpGet] + public async Task TestSingleMeasuringAFNData2(string measuring, int value, DateTime time) + { + var meter = new SingleMeasuringEntity() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + ProjectCode = "10059", + Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), + SingleMeasuring = new Tuple(measuring, value) }; await _iotDBProvider.InsertAsync(meter); } @@ -186,27 +239,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS await Task.CompletedTask; } - - - /// - /// 测试单个测点数据项 - /// - /// - /// - [HttpGet] - public async Task TestSingleMeasuringAFNData(string measuring, string value) - { - var meter = new SingleMeasuringAFNDataEntity() - { - SystemName = "energy", - DeviceId = "402440506", - DeviceType = "Ammeter", - ProjectCode = "10059", - Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), - SingleMeasuring = new Tuple(measuring, value) - }; - await _iotDBProvider.InsertAsync(meter); - } + /// /// 测试Redis批量读取10万条数据性能 diff --git a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs index 68d367c..292824e 100644 --- a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs @@ -11,8 +11,7 @@ using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Options; using Microsoft.Extensions.Logging; -using JiShe.CollectBus.Common.Helpers; -using JiShe.CollectBus.IotSystems.AFNEntity; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.Cassandra; diff --git a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs index cf936fe..ee132d3 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs +++ b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs @@ -5,7 +5,7 @@ using System.Text; using System.Threading.Tasks; using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Enums; -using JiShe.CollectBus.IoTDB.Provider; +using JiShe.CollectBus.IoTDB.Model; namespace JiShe.CollectBus.Ammeters { diff --git a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeterTreeModel.cs b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeterTreeModel.cs index ddc7daa..7fe7ebc 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeterTreeModel.cs +++ b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeterTreeModel.cs @@ -1,11 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using JiShe.CollectBus.IoTDB.Attribute; +using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Enums; -using JiShe.CollectBus.IoTDB.Provider; +using JiShe.CollectBus.IoTDB.Model; namespace JiShe.CollectBus.Ammeters { diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index e6136df..2bdcf6c 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -233,5 +233,22 @@ namespace JiShe.CollectBus.Common.Extensions .AddHours(hours) .AddMinutes(minutes); } + + + /// + /// 将 DateTime 时间转换为 DateTimeOffset 时间 + /// + /// + /// + public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime) + { + //确保 Kind 为 Local(如果是 Unspecified) + DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified + ? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local) + : rawDateTime; + + // 转换为 DateTimeOffset(自动应用本地时区偏移) + return new DateTimeOffset(localDateTime); + } } } From 309f5c37d27eeb02bb26166b96b1474a37e4fbcd Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 21 Apr 2025 14:57:12 +0800 Subject: [PATCH 04/14] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=8D=95=E4=BE=A7?= =?UTF-8?q?=E7=82=B9=E8=A1=A8=E6=A8=A1=E5=9E=8B=E8=B7=AF=E5=BE=84=E5=A4=84?= =?UTF-8?q?=E7=90=86=E9=97=AE=E9=A2=98=EF=BC=8C=E5=B9=B6=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E8=B7=AF=E5=BE=84Attribute=EF=BC=8C=E4=BB=A5=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=9B=BA=E5=AE=9A=E5=AD=98=E5=82=A8=E8=B7=AF=E5=BE=84=E5=9C=BA?= =?UTF-8?q?=E6=99=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Attribute/SingleMeasuringAttribute.cs | 2 +- .../Attribute/TableNameOrTreePathAttribute.cs | 18 ++++++ .../JiShe.CollectBus.IoTDB/Model/IoTEntity.cs | 2 +- .../Model/TableModelSingleMeasuringEntity.cs | 24 ++++++++ ...y.cs => TreeModelSingleMeasuringEntity.cs} | 6 +- .../Provider/DevicePathBuilder.cs | 11 ++++ .../Provider/IoTDBProvider.cs | 29 ++++++++-- .../Samples/SampleAppService.cs | 55 +++++++++++++++++-- 8 files changed, 132 insertions(+), 15 deletions(-) create mode 100644 modules/JiShe.CollectBus.IoTDB/Attribute/TableNameOrTreePathAttribute.cs create mode 100644 modules/JiShe.CollectBus.IoTDB/Model/TableModelSingleMeasuringEntity.cs rename modules/JiShe.CollectBus.IoTDB/Model/{SingleMeasuringEntity.cs => TreeModelSingleMeasuringEntity.cs} (80%) diff --git a/modules/JiShe.CollectBus.IoTDB/Attribute/SingleMeasuringAttribute.cs b/modules/JiShe.CollectBus.IoTDB/Attribute/SingleMeasuringAttribute.cs index cebb85a..5f0ca07 100644 --- a/modules/JiShe.CollectBus.IoTDB/Attribute/SingleMeasuringAttribute.cs +++ b/modules/JiShe.CollectBus.IoTDB/Attribute/SingleMeasuringAttribute.cs @@ -1,7 +1,7 @@ namespace JiShe.CollectBus.IoTDB.Attribute { /// - /// 用于标识当前实体为单侧点模式,单侧点模式只有一个Filed标识字段,类型是Tuple,Item1=>测点名称,Item2=>测点值,泛型 + /// 用于标识当前实体为单侧点模式,单侧点模式只有一个Filed标识字段,类型是Tuple,Item1=>测点名称,Item2=>测点值,泛型 /// [AttributeUsage(AttributeTargets.Property)] public class SingleMeasuringAttribute : System.Attribute diff --git a/modules/JiShe.CollectBus.IoTDB/Attribute/TableNameOrTreePathAttribute.cs b/modules/JiShe.CollectBus.IoTDB/Attribute/TableNameOrTreePathAttribute.cs new file mode 100644 index 0000000..1b4f4f0 --- /dev/null +++ b/modules/JiShe.CollectBus.IoTDB/Attribute/TableNameOrTreePathAttribute.cs @@ -0,0 +1,18 @@ +using JiShe.CollectBus.IoTDB.Enums; + +namespace JiShe.CollectBus.IoTDB.Attribute +{ + /// + /// IoTDB实体存储路径或表名称,一般用于已经明确的存储路径或表名称,例如日志存储 + /// + [AttributeUsage(AttributeTargets.Class)] + public class TableNameOrTreePathAttribute : System.Attribute + { + public string TableNameOrTreePath { get; } + + public TableNameOrTreePathAttribute(string tableNameOrTreePath) + { + TableNameOrTreePath = tableNameOrTreePath; + } + } +} diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index 6431e6f..73a90e0 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -3,7 +3,7 @@ namespace JiShe.CollectBus.IoTDB.Model { /// - /// IoT实体基类,此类适用于多个数据测点记录场景,单个测点请使用子类 SingleMeasuringEntity + /// IoT实体基类,此类适用于多个数据测点记录场景,单个测点请使用子类 SingleMeasuring /// public abstract class IoTEntity { diff --git a/modules/JiShe.CollectBus.IoTDB/Model/TableModelSingleMeasuringEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/TableModelSingleMeasuringEntity.cs new file mode 100644 index 0000000..4308dae --- /dev/null +++ b/modules/JiShe.CollectBus.IoTDB/Model/TableModelSingleMeasuringEntity.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using JiShe.CollectBus.IoTDB.Attribute; +using JiShe.CollectBus.IoTDB.Enums; +using JiShe.CollectBus.IoTDB.Provider; + +namespace JiShe.CollectBus.IoTDB.Model +{ + /// + /// Table模型单项数据实体 + /// + [EntityType(EntityTypeEnum.TableModel)] + public class TableModelSingleMeasuringEntity : IoTEntity + { + /// + /// 单项数据键值对 + /// + [SingleMeasuring(nameof(SingleColumn))] + public required Tuple SingleColumn { get; set; } + } +} diff --git a/modules/JiShe.CollectBus.IoTDB/Model/SingleMeasuringEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/TreeModelSingleMeasuringEntity.cs similarity index 80% rename from modules/JiShe.CollectBus.IoTDB/Model/SingleMeasuringEntity.cs rename to modules/JiShe.CollectBus.IoTDB/Model/TreeModelSingleMeasuringEntity.cs index 72a8ae6..9b3609c 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/SingleMeasuringEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/TreeModelSingleMeasuringEntity.cs @@ -10,13 +10,13 @@ using JiShe.CollectBus.IoTDB.Provider; namespace JiShe.CollectBus.IoTDB.Model { /// - /// 单项数据实体 + /// Tree模型单项数据实体 /// [EntityType(EntityTypeEnum.TreeModel)] - public class SingleMeasuringEntity : IoTEntity + public class TreeModelSingleMeasuringEntity : IoTEntity { /// - /// 单项数据对象 + /// 单项数据键值对 /// [SingleMeasuring(nameof(SingleMeasuring))] public required Tuple SingleMeasuring { get; set; } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index a858c73..f9127db 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -30,6 +30,17 @@ namespace JiShe.CollectBus.IoTDB.Provider var type = typeof(T); return $"{type.Name.ToLower()}"; } + + /// + /// 获取表名称,用作单侧点表模型特殊处理。 + /// + /// + /// + /// + public static string GetDeviceTableName(T entity) where T : IoTEntity + { + return $"{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; + } } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 1fac82a..9b76a6f 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -159,7 +159,13 @@ namespace JiShe.CollectBus.IoTDB.Provider throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); } - + string tableNameOrTreePath = string.Empty; + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + if (tableNameOrTreePathAttribute != null) + { + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + } + foreach (var entity in entities) { timestamps.Add(entity.Timestamps); @@ -198,6 +204,12 @@ namespace JiShe.CollectBus.IoTDB.Provider { rowValues.Add(null); } + + //同时如果是单测点模式,且是table模型存储,路径只能通过DevicePathBuilder.GetDeviceTableName(entity)获取 + if (_runtimeContext.UseTableSessionPool) + { + tableNameOrTreePath = DevicePathBuilder.GetDeviceTableName(entity); + } } else { @@ -209,14 +221,23 @@ namespace JiShe.CollectBus.IoTDB.Provider values.Add(rowValues); - if (!_runtimeContext.UseTableSessionPool)//树模型 + //如果指定了路径 + if (!string.IsNullOrWhiteSpace(tableNameOrTreePath)) { - devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); + devicePaths.Add(tableNameOrTreePath); } else { - devicePaths.Add(DevicePathBuilder.GetTableName()); + if (!_runtimeContext.UseTableSessionPool)//树模型 + { + devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); + } + else + { + devicePaths.Add(DevicePathBuilder.GetTableName()); + } } + } if (devicePaths.Count > 1) diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 101ab0f..7693636 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -9,6 +9,7 @@ using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IotSystems.PrepayModel; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; @@ -150,14 +151,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS } /// - /// 测试单个测点数据项 + /// 测试树模型单个测点数据项 /// /// /// [HttpGet] - public async Task TestSingleMeasuringAFNData(string measuring, string value, DateTime time) + public async Task TestTreeModelSingleMeasuringEntity(string measuring, string value, DateTime time) { - var meter = new SingleMeasuringEntity() + var meter = new TreeModelSingleMeasuringEntity() { SystemName = "energy", DeviceId = "402440506", @@ -170,14 +171,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS } /// - /// 测试单个测点数据项2 + /// 测试树模型单个测点数据项2 /// /// /// [HttpGet] - public async Task TestSingleMeasuringAFNData2(string measuring, int value, DateTime time) + public async Task TestTreeModelSingleMeasuringEntity2(string measuring, int value, DateTime time) { - var meter = new SingleMeasuringEntity() + var meter = new TreeModelSingleMeasuringEntity() { SystemName = "energy", DeviceId = "402440506", @@ -189,6 +190,48 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS await _iotDBProvider.InsertAsync(meter); } + /// + /// 测试表模型单个测点数据项 + /// + /// + /// + [HttpGet] + public async Task TestTableModelSingleMeasuringEntity(string measuring, string value, DateTime time) + { + var meter = new TableModelSingleMeasuringEntity() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + ProjectCode = "10059", + Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), + SingleColumn = new Tuple(measuring, value) + }; + _dbContext.UseTableSessionPool = true; + await _iotDBProvider.InsertAsync(meter); + } + + /// + /// 测试表模型单个测点数据项2 + /// + /// + /// + [HttpGet] + public async Task TestTableModelSingleMeasuringEntity2(string measuring, int value, DateTime time) + { + var meter = new TableModelSingleMeasuringEntity() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + ProjectCode = "10059", + Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), + SingleColumn = new Tuple(measuring, value) + }; + _dbContext.UseTableSessionPool = true; + await _iotDBProvider.InsertAsync(meter); + } + /// /// 测试设备分组均衡控制算法 /// From a6d970af19d6fbf2020447bc31ac6765862833eb Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 21 Apr 2025 22:57:49 +0800 Subject: [PATCH 05/14] =?UTF-8?q?=E4=BC=98=E5=8C=96IoTDB=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E6=98=A0=E5=B0=84=EF=BC=8C=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=BE=AE=E7=A7=92=E5=92=8C=E7=BA=B3=E7=A7=92=E7=9A=84=E6=89=A9?= =?UTF-8?q?=E5=B1=95=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusIoTDBModule.cs | 4 +- .../Context/IoTDBRuntimeContext.cs | 5 +- .../JiShe.CollectBus.IoTDB/Model/IoTEntity.cs | 9 +- .../Provider/DevicePathBuilder.cs | 4 +- .../Provider/IoTDBProvider.cs | 114 +++-- .../Internal/KafkaOptionConfig.cs | 2 +- .../Samples/SampleAppService.cs | 22 +- .../BasicScheduledMeterReadingService.cs | 402 +++++++++--------- ...nergySystemScheduledMeterReadingService.cs | 8 +- .../MeterReadingTelemetryPacketInfo.cs | 102 +++-- .../Encrypt/EncryptUtil.cs | 65 +++ .../Extensions/DateTimeExtensions.cs | 38 +- .../Extensions/DateTimeOffsetExtensions.cs | 101 +++++ .../Helpers/CommonHelper.cs | 4 +- .../Pages/Monitor.cshtml | 2 +- web/JiShe.CollectBus.Host/appsettings.json | 3 +- 16 files changed, 547 insertions(+), 338 deletions(-) create mode 100644 shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs create mode 100644 shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs diff --git a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs index 2cce113..6d26bdc 100644 --- a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs +++ b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs @@ -16,7 +16,7 @@ public class CollectBusIoTDbModule : AbpModule var configuration = context.Services.GetConfiguration(); Configure(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); }); - // 注册上下文为Scoped - context.Services.AddScoped(); + //// 注册上下文为Scoped + //context.Services.AddScoped(); } } \ No newline at end of file diff --git a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs index afad488..ef68325 100644 --- a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs +++ b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs @@ -1,16 +1,17 @@ using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.IoTDB.Context { /// /// IoTDB SessionPool 运行时上下文 /// - public class IoTDbRuntimeContext + public class IoTDBRuntimeContext: IScopedDependency { private readonly bool _defaultValue; - public IoTDbRuntimeContext(IOptions options) + public IoTDBRuntimeContext(IOptions options) { _defaultValue = options.Value.UseTableSessionPoolByDefault; UseTableSessionPool = _defaultValue; diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index 73a90e0..f3cdbe9 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -17,7 +17,7 @@ namespace JiShe.CollectBus.IoTDB.Model /// 项目编码 /// [TAGColumn] - public required string ProjectCode { get; set; } + public required string ProjectId { get; set; } /// /// 设备类型集中器、电表、水表、流量计、传感器等 @@ -32,8 +32,13 @@ namespace JiShe.CollectBus.IoTDB.Model public required string DeviceId { get; set; } /// - /// 当前时间戳,单位毫秒,必须通过DateTimeOffset获取 + /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + + /// + /// 数据创建时间戳,单位毫秒,必须通过DateTimeOffset获取 + /// + public virtual long CreationTime { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index f9127db..6a1a596 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDevicePath(T entity) where T : IoTEntity { - return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; + return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } @@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDeviceTableName(T entity) where T : IoTEntity { - return $"{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; + return $"{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 9b76a6f..b9c2cf0 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -24,7 +24,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private static readonly ConcurrentDictionary MetadataCache = new(); private readonly ILogger _logger; private readonly IIoTDbSessionFactory _sessionFactory; - private readonly IoTDbRuntimeContext _runtimeContext; + private readonly IoTDBRuntimeContext _runtimeContext; private IIoTDbSessionPool CurrentSession => _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); @@ -38,7 +38,7 @@ namespace JiShe.CollectBus.IoTDB.Provider public IoTDbProvider( ILogger logger, IIoTDbSessionFactory sessionFactory, - IoTDbRuntimeContext runtimeContext) + IoTDBRuntimeContext runtimeContext) { _logger = logger; _sessionFactory = sessionFactory; @@ -54,11 +54,19 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task InsertAsync(T entity) where T : IoTEntity { - var metadata = GetMetadata(); + try + { + var metadata = GetMetadata(); - var tablet = BuildTablet(new[] { entity }, metadata); + var tablet = BuildTablet(new[] { entity }, metadata); - await CurrentSession.InsertAsync(tablet); + await CurrentSession.InsertAsync(tablet); + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(InsertAsync)} 插入数据时发生异常"); + throw; + } } /// @@ -68,15 +76,23 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity { - var metadata = GetMetadata(); - - var batchSize = 1000; - var batches = entities.Chunk(batchSize); - - foreach (var batch in batches) + try { - var tablet = BuildTablet(batch, metadata); - await CurrentSession.InsertAsync(tablet); + var metadata = GetMetadata(); + + var batchSize = 1000; + var batches = entities.Chunk(batchSize); + + foreach (var batch in batches) + { + var tablet = BuildTablet(batch, metadata); + await CurrentSession.InsertAsync(tablet); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常"); + throw; } } @@ -89,18 +105,26 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task DeleteAsync(QueryOptions options) where T : IoTEntity { - var query = BuildDeleteSQL(options); - var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - - if (!sessionDataSet.HasNext()) + try { - _logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。"); - return 0; - } + var query = BuildDeleteSQL(options); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - //获取唯一结果行 - var row = sessionDataSet.Next(); - return row.Values[0]; + if (!sessionDataSet.HasNext()) + { + _logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。"); + return 0; + } + + //获取唯一结果行 + var row = sessionDataSet.Next(); + return row.Values[0]; + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(DeleteAsync)} 删除数据时发生异常"); + throw; + } } /// @@ -111,16 +135,24 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { - var query = BuildQuerySQL(options); - var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - - var result = new BusPagedResult + try { - TotalCount = await GetTotalCount(options), - Items = ParseResults(sessionDataSet, options.PageSize) - }; + var query = BuildQuerySQL(options); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - return result; + var result = new BusPagedResult + { + TotalCount = await GetTotalCount(options), + Items = ParseResults(sessionDataSet, options.PageSize) + }; + + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(QueryAsync)} 查询数据时发生异常"); + throw; + } } /// @@ -160,12 +192,12 @@ namespace JiShe.CollectBus.IoTDB.Provider } string tableNameOrTreePath = string.Empty; - var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); if (tableNameOrTreePathAttribute != null) { - tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; } - + foreach (var entity in entities) { timestamps.Add(entity.Timestamps); @@ -214,7 +246,7 @@ namespace JiShe.CollectBus.IoTDB.Provider else { - rowValues.Add(value); + rowValues.Add(value); } } @@ -237,7 +269,7 @@ namespace JiShe.CollectBus.IoTDB.Provider devicePaths.Add(DevicePathBuilder.GetTableName()); } } - + } if (devicePaths.Count > 1) @@ -258,8 +290,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 数据集合 /// 时间戳集合 /// - private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, - List> values, List timestamps) + private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List> values, List timestamps) { //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可 @@ -280,8 +311,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 数据集合 /// 时间戳集合 /// - private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, - List> values, List timestamps) + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List> values, List timestamps) { var tablet = new Tablet( tableName, @@ -639,7 +669,7 @@ namespace JiShe.CollectBus.IoTDB.Provider ["DOUBLE"] = TSDataType.DOUBLE, ["TEXT"] = TSDataType.TEXT, ["NULLTYPE"] = TSDataType.NONE, - ["TIMESTAMP"] = TSDataType.TIMESTAMP, + ["DATETIME"] = TSDataType.TIMESTAMP, ["DATE"] = TSDataType.DATE, ["BLOB"] = TSDataType.BLOB, ["DECIMAL"] = TSDataType.STRING, @@ -659,7 +689,7 @@ namespace JiShe.CollectBus.IoTDB.Provider ["DOUBLE"] = 0.0d, ["TEXT"] = string.Empty, ["NULLTYPE"] = null, - ["TIMESTAMP"] = null, + ["DATETIME"] = null, ["DATE"] = null, ["BLOB"] = null, ["DECIMAL"] = "0.0", diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index 5f7bdf1..3bea5f1 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -57,5 +57,5 @@ public class KafkaOptionConfig /// /// 首次采集时间 /// - public DateTime FirstCollectionTime { get; set; } + public DateTime? FirstCollectionTime { get; set; } } \ No newline at end of file diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 7693636..b616544 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -30,12 +30,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS { private readonly ILogger _logger; private readonly IIoTDbProvider _iotDBProvider; - private readonly IoTDbRuntimeContext _dbContext; + private readonly IoTDBRuntimeContext _dbContext; private readonly IoTDbOptions _options; private readonly IRedisDataCacheService _redisDataCacheService; public SampleAppService(IIoTDbProvider iotDBProvider, IOptions options, - IoTDbRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) + IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) { _iotDBProvider = iotDBProvider; _options = options.Value; @@ -60,7 +60,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, IssuedMessageHexString = "messageHexString", Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), @@ -83,7 +83,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), }; @@ -99,7 +99,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), }; @@ -122,7 +122,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, IssuedMessageHexString = "dsdfsfd", Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), @@ -141,7 +141,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, Currentd = 22, IssuedMessageHexString = "dsdfsfd", @@ -163,7 +163,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleMeasuring = new Tuple(measuring, value) }; @@ -183,7 +183,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleMeasuring = new Tuple(measuring, value) }; @@ -203,7 +203,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleColumn = new Tuple(measuring, value) }; @@ -224,7 +224,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleColumn = new Tuple(measuring, value) }; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 4ff5dd6..0fb4c49 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,6 @@ -using DnsClient.Protocol; +using Confluent.Kafka; +using DnsClient.Protocol; +using FreeSql; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.BuildSendDatas; @@ -9,6 +11,7 @@ using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; @@ -19,6 +22,8 @@ using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.Repository.MeterReadingRecord; using Mapster; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; @@ -38,22 +43,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading { private readonly ILogger _logger; private readonly IIoTDbProvider _dbProvider; - private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IProducerService _producerService; private readonly IRedisDataCacheService _redisDataCacheService; private readonly KafkaOptionConfig _kafkaOptions; + private readonly IoTDBRuntimeContext _runtimeContext; public BasicScheduledMeterReadingService( ILogger logger, - IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IRedisDataCacheService redisDataCacheService, IIoTDbProvider dbProvider, + IoTDBRuntimeContext runtimeContext, IOptions kafkaOptions) { _logger = logger; _dbProvider = dbProvider; - _meterReadingRecordRepository = meterReadingRecordRepository; + _runtimeContext = runtimeContext; _producerService = producerService; _redisDataCacheService = redisDataCacheService; _kafkaOptions = kafkaOptions.Value; @@ -133,17 +138,49 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - //_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}"); + List pushTaskInfos = new(); - _ = CreateMeterPublishTask( + await CreateMeterPublishTask( timeDensity: timeDensity, - taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity), meterType: MeterTypeEnum.Ammeter, - taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => + taskCreateAction: (timeDensity, data, groupIndex, timestamps) => { - AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); + if (tempTask == null || tempTask.Count <= 0) + { + return; + } + + pushTaskInfos.AddRange(tempTask); + + //using (var score = _serviceProvider.CreateScope()) + //{ + // var _dbContext = score.ServiceProvider.GetRequiredService(); + // _dbContext.UseTableSessionPool = true; + // _dbProvider.BatchInsertAsync(tempTask); + //} + + _runtimeContext.UseTableSessionPool = true; + _dbProvider.BatchInsertAsync(tempTask); }); + //if (pushTaskInfos.Count <= 0) + //{ + // _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有任务数据信息,-1051"); + // continue; + //} + + //using (var score = _serviceProvider.CreateScope()) + //{ + // var _dbContext = score.ServiceProvider.GetRequiredService(); + // _dbContext.UseTableSessionPool = true; + // _dbProvider.BatchInsertAsync(pushTaskInfos); + //} + + //_dbContext.UseTableSessionPool = true; + //await _dbProvider.BatchInsertAsync(pushTaskInfos); + } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { @@ -152,7 +189,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _ = CreateMeterPublishTask( timeDensity: timeDensity, - taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + nextTaskTime: tasksToBeIssueModel.NextTaskTime, meterType: MeterTypeEnum.Ammeter, taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => { @@ -193,11 +230,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { -#if DEBUG - return; - - - +#if DEBUG var timeDensity = "15"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; @@ -237,8 +270,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - //return; + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif @@ -261,13 +294,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + if (_kafkaOptions.FirstCollectionTime.HasValue == false) + { + _kafkaOptions.FirstCollectionTime = DateTime.Now; + } //先处理采集频率任务缓存 foreach (var item in meterInfoGroupByTimeDensity) { TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { TimeDensity = item.Key, - NextTaskTime = _kafkaOptions.FirstCollectionTime.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 }; //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 @@ -465,60 +502,60 @@ namespace JiShe.CollectBus.ScheduledMeterReading } - /// - /// 创建电表待发送的任务数据 - /// - /// 采集频率 - /// 时间格式的任务批次名称 - /// - private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) - { - var timer = Stopwatch.StartNew(); + ///// + ///// 创建电表待发送的任务数据 + ///// + ///// 采集频率 + ///// 时间格式的任务批次名称 + ///// + //private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) + //{ + // var timer = Stopwatch.StartNew(); - //获取对应频率中的所有电表信息 - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + // //获取对应频率中的所有电表信息 + // var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + // var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + // var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - List meterInfos = new List(); - decimal? cursor = null; - string member = null; - bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + // List meterInfos = new List(); + // decimal? cursor = null; + // string member = null; + // bool hasNext; + // do + // { + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + // } while (hasNext); - if (meterInfos == null || meterInfos.Count <= 0) - { - timer.Stop(); - _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); - return; - } + // if (meterInfos == null || meterInfos.Count <= 0) + // { + // timer.Stop(); + // _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); + // return; + // } - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: meterInfos, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => - { - AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); - } - ); + // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + // items: meterInfos, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, groupIndex) => + // { + // AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + // } + // ); - timer.Stop(); - _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); - } + // timer.Stop(); + // _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + //} /// @@ -527,38 +564,33 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 采集频率 /// 电表信息 /// 集中器所在分组 - /// 时间格式的任务批次名称 + /// 采集频率对应的时间戳 /// - private void AmmerterCreatePublishTaskAction(int timeDensity - , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch) + private List AmmerterCreatePublishTaskAction(int timeDensity + , AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) { + var currentTime = DateTime.Now; + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? - var currentTime = DateTime.Now; - var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); - return; + return null; } //载波的不处理 if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); - return; + return null; } if (ammeterInfo.State.Equals(2)) { //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); - return; + return null; } ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 @@ -571,22 +603,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) { // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); - return; + return null; } if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); - return; + return null; } if (Convert.ToInt32(ammeterInfo.Address) > 65535) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); - return; + return null; } if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); - return; + return null; } List tempCodes = ammeterInfo.ItemCodes.Deserialize>()!; @@ -613,7 +645,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (tempSubCodes == null || tempSubCodes.Count <= 0) { //_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); - return; + return null; } else { @@ -683,18 +715,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading var meterReadingRecords = new MeterReadingTelemetryPacketInfo() { - ProjectID = ammeterInfo.ProjectID, + SystemName = SystemType, + ProjectId = $"{ammeterInfo.ProjectID}", + DeviceType = $"{MeterTypeEnum.Ammeter}", + DeviceId = $"{ammeterInfo.MemberId}", + Timestamps = timestamps.GetDateTimeOffset().ToUnixTimeMilliseconds(), DatabaseBusiID = ammeterInfo.DatabaseBusiID, - PendingCopyReadTime = pendingCopyReadTime, + PendingCopyReadTime = timestamps, CreationTime = currentTime, MeterAddress = ammeterInfo.AmmerterAddress, - MeterId = ammeterInfo.MeterId, - MeterType = MeterTypeEnum.Ammeter, FocusAddress = ammeterInfo.FocusAddress, - FocusId = ammeterInfo.FocusId, - AFN = aFN, + AFN = (int)aFN, Fn = fn, - Seq = builderResponse.Seq, + //Seq = builderResponse.Seq, MSA = builderResponse.MSA, ItemCode = tempItem, TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA), @@ -709,37 +742,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading taskList.Add(meterReadingRecords); } - if (taskList == null - || taskList.Count() <= 0 - || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) - || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) - || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); - return; - } + return taskList; - using (var pipe = FreeRedisProvider.Instance.StartPipe()) - { - foreach (var item in taskList) - { - // 主数据存储Hash - pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize()); + //using (var score = _serviceProvider.CreateScope()) + //{ + // var _dbContext = score.ServiceProvider.GetRequiredService(); + // _dbContext.UseTableSessionPool = true; + // _dbProvider.BatchInsertAsync(taskList); + //} - // Set索引缓存 - pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId); - // ZSET索引缓存Key - pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId); - } - pipe.EndPipe(); - } - - //await _redisDataCacheService.BatchInsertDataAsync( - // redisCacheTelemetryPacketInfoHashKey, - // redisCacheTelemetryPacketInfoSetIndexKey, - // redisCacheTelemetryPacketInfoZSetScoresIndexKey, - // taskList); } #endregion @@ -864,7 +876,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } ////删除任务数据 @@ -877,52 +889,52 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); } - /// - /// 创建水表待发送的任务数据 - /// - /// 采集频率 - /// 水表信息 - /// 集中器所在分组 - /// 时间格式的任务批次名称 - /// - private void WatermeterCreatePublishTaskAction(int timeDensity - , WatermeterInfo meterInfo, int groupIndex, string taskBatch) - { - var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + ///// + ///// 创建水表待发送的任务数据 + ///// + ///// 采集频率 + ///// 水表信息 + ///// 集中器所在分组 + ///// 时间格式的任务批次名称 + ///// + //private void WatermeterCreatePublishTaskAction(int timeDensity + // , WatermeterInfo meterInfo, int groupIndex, string taskBatch) + //{ + // var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; - var currentTime = DateTime.Now; - var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); + // var currentTime = DateTime.Now; + // var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var taskInfo = new MeterReadingTelemetryPacketInfo() - { - Seq= null, - - }; - // + // var taskInfo = new MeterReadingTelemetryPacketInfo() + // { + // Seq= null, - Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); + // }; + // // - using (var pipe = FreeRedisProvider.Instance.StartPipe()) - { - // 主数据存储Hash - pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); + // Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); - // Set索引缓存 - pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); + // using (var pipe = FreeRedisProvider.Instance.StartPipe()) + // { + // // 主数据存储Hash + // pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); - // ZSET索引缓存Key - pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); + // // Set索引缓存 + // pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); - pipe.EndPipe(); - } + // // ZSET索引缓存Key + // pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); - } + // pipe.EndPipe(); + // } + + //} #endregion @@ -961,11 +973,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 创建表的待发送的任务数据 /// /// 采集频率 - /// 时间格式的任务批次名称 + /// 采集频率对应的任务时间戳 /// 表类型 /// 具体的创建任务的委托 /// - private async Task CreateMeterPublishTask(int timeDensity, string taskBatch, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel + private async Task CreateMeterPublishTask(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel { var timer = Stopwatch.StartNew(); @@ -978,20 +990,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading decimal? cursor = null; string member = null; bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + //do + //{ + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + //} while (hasNext); + + + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1, + lastScore: cursor, + lastMember: member); + meterInfos.AddRange(page.Items); if (meterInfos == null || meterInfos.Count <= 0) { @@ -1000,13 +1021,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.FocusAddress, processor: (data, groupIndex) => { - taskCreateAction(timeDensity, data, groupIndex, taskBatch); + taskCreateAction(timeDensity, data, groupIndex, nextTaskTime); } ); @@ -1034,29 +1054,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading string member = null; bool hasNext; var stopwatch = Stopwatch.StartNew(); - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheTelemetryPacketInfoHashKey, - redisCacheTelemetryPacketInfoZSetScoresIndexKey, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + //do + //{ + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheTelemetryPacketInfoHashKey, + // redisCacheTelemetryPacketInfoZSetScoresIndexKey, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: page.Items, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => - { - _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); - } - ); + // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + // items: page.Items, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, groupIndex) => + // { + // _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + // } + // ); - } while (hasNext); + //} while (hasNext); stopwatch.Stop(); _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index b8fd08b..fe0746f 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -8,6 +8,7 @@ using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; @@ -35,18 +36,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService { string serverTagName = string.Empty; + public EnergySystemScheduledMeterReadingService( ILogger logger, IIoTDbProvider dbProvider, - IMeterReadingRecordRepository meterReadingRecordRepository, - IOptions kafkaOptions, + IOptions kafkaOptions, + IoTDBRuntimeContext runtimeContext, IProducerService producerService, IRedisDataCacheService redisDataCacheService) : base(logger, - meterReadingRecordRepository, producerService, redisDataCacheService, dbProvider, + runtimeContext, kafkaOptions) { serverTagName = kafkaOptions.Value.ServerTagName; diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs index 3aafa41..3bffbb8 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -1,5 +1,9 @@ -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Encrypt; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IoTDB.Attribute; +using JiShe.CollectBus.IoTDB.Enums; +using JiShe.CollectBus.IoTDB.Model; using System; using System.Collections.Generic; using System.Linq; @@ -13,78 +17,85 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// /// 抄读任务Redis缓存数据记录 /// - public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel + [EntityType(EntityTypeEnum.TableModel)] + public class MeterReadingTelemetryPacketInfo : IoTEntity { /// - /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 + /// 排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳、或者某一个固定的标识1 /// - public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}"; - - /// - /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 - /// - public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks; - - + [FIELDColumn] + public string ScoreValue + { + get + { + return $"{FocusAddress}.{TaskMark}".Md5Fun(); + } + } + /// /// 是否手动操作 /// + [FIELDColumn] public bool ManualOrNot { get; set; } /// /// 任务数据唯一标记 /// - public decimal TaskMark { get; set; } - - /// - /// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。 - /// - public long Timestamps { get; set; } + [FIELDColumn] + public string TaskMark { get; set; } /// /// 是否超时 /// + [FIELDColumn] public bool IsTimeout { get; set; } = false; /// /// 待抄读时间 /// + [FIELDColumn] public DateTime PendingCopyReadTime { get; set; } - - + + /// + /// 集中器Id + /// + [FIELDColumn] + public int FocusId { get; set; } + + /// + /// 表Id + /// + [FIELDColumn] + public int MeterId { get; set; } + /// /// 集中器地址 /// + [FIELDColumn] public string FocusAddress { get; set; } - + /// /// 表地址 /// + [FIELDColumn] public string MeterAddress { get; set; } - /// - /// 表类型 - /// - public MeterTypeEnum MeterType { get; set; } - - /// - /// 项目ID - /// - public int ProjectID { get; set; } - /// /// 数据库业务ID /// + [FIELDColumn] public int DatabaseBusiID { get; set; } /// /// AFN功能码 /// - public AFN AFN { get; set; } + [FIELDColumn] + public int AFN { get; set; } /// /// 抄读功能码 /// + [FIELDColumn] public int Fn { get; set; } /// @@ -95,66 +106,73 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// /// 采集项编码 /// - public string ItemCode { get; set;} + [FIELDColumn] + public string ItemCode { get; set; } - /// - /// 帧序列域SEQ - /// - public required Seq Seq { get; set; } + ///// + ///// 帧序列域SEQ + ///// + //public required Seq Seq { get; set; } /// /// 地址域A3的主站地址MSA /// + [FIELDColumn] public int MSA { get; set; } /// /// 是否发送 /// + [FIELDColumn] public bool IsSend { get; set; } /// /// 创建时间 /// + [FIELDColumn] public DateTime CreationTime { get; set; } /// /// 下发消息内容 /// + [FIELDColumn] public string IssuedMessageHexString { get; set; } /// /// 下发消息Id /// + [FIELDColumn] public string IssuedMessageId { get; set; } /// /// 消息上报内容 /// + [FIELDColumn] public string? ReceivedMessageHexString { get; set; } /// /// 消息上报时间 /// + [FIELDColumn] public DateTime? ReceivedTime { get; set; } /// /// 上报消息Id /// - public string ReceivedMessageId { get; set; } + [FIELDColumn] + public string ReceivedMessageId { get; set; } /// /// 上报报文解析备注,异常情况下才有 /// + [FIELDColumn] public string ReceivedRemark { get; set; } /// /// 是否已上报 /// + [FIELDColumn] public bool IsReceived { get; set; } - - //public void CreateDataId(Guid Id) - //{ - // this.Id = Id; - //} + } } diff --git a/shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs b/shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs new file mode 100644 index 0000000..72cdf41 --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Encrypt +{ + /// + /// 各种加密辅助类 + /// + public static class EncryptUtil + { + #region MD5加密 + + /// + /// MD5加密 + /// + public static string Md5Fun(this string value) + { + if (value == null) + { + throw new ArgumentNullException("未将对象引用设置到对象的实例。"); + } + + var encoding = Encoding.UTF8; + MD5 md5 = MD5.Create(); + return HashAlgorithmBase(md5, value, encoding); + } + + /// + /// 加权MD5加密 + /// + public static string Md5Fun(this string value, string salt) + { + return salt == null ? value.Md5Fun() : (value + "『" + salt + "』").Md5Fun(); + } + + #endregion + + /// + /// HashAlgorithm 加密统一方法 + /// + private static string HashAlgorithmBase(HashAlgorithm hashAlgorithmObj, string source, Encoding encoding) + { + byte[] btStr = encoding.GetBytes(source); + byte[] hashStr = hashAlgorithmObj.ComputeHash(btStr); + return hashStr.Bytes2Str(); + } + + /// + /// 转换成字符串 + /// + private static string Bytes2Str(this IEnumerable source, string formatStr = "{0:X2}") + { + StringBuilder pwd = new StringBuilder(); + foreach (byte btStr in source) + { + pwd.AppendFormat(formatStr, btStr); + } + return pwd.ToString(); + } + } +} diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index 2bdcf6c..e72f4fd 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -181,25 +181,7 @@ namespace JiShe.CollectBus.Common.Extensions return $"{dateTime:yyyyMMddHH}"; #endif } - - /// - /// 获取当前时间毫秒级时间戳 - /// - /// - public static long GetCurrentTimeMillis() - { - return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - } - - /// - /// 将Unix时间戳转换为日期时间 - /// - /// - /// - public static DateTime FromUnixMillis(long millis) - { - return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; - } + /// /// 采集时间节点计算 @@ -233,22 +215,6 @@ namespace JiShe.CollectBus.Common.Extensions .AddHours(hours) .AddMinutes(minutes); } - - - /// - /// 将 DateTime 时间转换为 DateTimeOffset 时间 - /// - /// - /// - public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime) - { - //确保 Kind 为 Local(如果是 Unspecified) - DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified - ? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local) - : rawDateTime; - - // 转换为 DateTimeOffset(自动应用本地时区偏移) - return new DateTimeOffset(localDateTime); - } + } } diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs new file mode 100644 index 0000000..c73fbd0 --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Linq; + +namespace JiShe.CollectBus.Common.Extensions +{ + public static class DateTimeOffsetExtensions + { + + /// + /// 获取当前时间毫秒级时间戳 + /// + /// + public static long GetCurrentTimeMillis() + { + return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } + + /// + /// 将Unix时间戳转换为日期时间 + /// + /// + /// + public static DateTime FromUnixMillis(long millis) + { + return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; + } + + /// + /// 采集时间节点计算 + /// + /// 待采集时间 + /// + /// + public static DateTime CalculateNextCollectionTime(this DateTime referenceTime, int interval) + { + // 计算精确到分钟的基准时间 + var baseTime = new DateTime( + referenceTime.Year, + referenceTime.Month, + referenceTime.Day, + referenceTime.Hour, + referenceTime.Minute, + 0); + + // 计算总分钟数和下一个间隔点 + int totalMinutes = baseTime.Hour * 60 + baseTime.Minute; + int nextTotalMinutes = ((totalMinutes / interval) + 1) * interval; + + // 处理跨天情况 + int daysToAdd = nextTotalMinutes / (24 * 60); + int remainingMinutes = nextTotalMinutes % (24 * 60); + int hours = remainingMinutes / 60; + int minutes = remainingMinutes % 60; + + return baseTime.Date + .AddDays(daysToAdd) + .AddHours(hours) + .AddMinutes(minutes); + } + + + /// + /// 将 DateTime 时间转换为 DateTimeOffset 时间 + /// + /// + /// + public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime) + { + //确保 Kind 为 Local(如果是 Unspecified) + DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified + ? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local) + : rawDateTime; + + // 转换为 DateTimeOffset(自动应用本地时区偏移) + return new DateTimeOffset(localDateTime); + } + + private static readonly long UnixEpochTicks = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero).Ticks; + + /// + /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的微秒数 + /// + public static long ToUnixTimeMicroseconds(this DateTimeOffset dateTimeOffset) + { + // Ticks 单位是 100 纳秒,转换为微秒需除以 10 + long elapsedTicks = dateTimeOffset.Ticks - UnixEpochTicks; + return elapsedTicks / 10; // 1 微秒 = 1000 纳秒 = 10 Ticks + } + + /// + /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的纳秒数 + /// + public static long ToUnixTimeNanoseconds(this DateTimeOffset dateTimeOffset) + { + long nanoseconds = (dateTimeOffset.Ticks - UnixEpochTicks) * 100; + return nanoseconds; + } + } +} diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 3c36d23..34cf37f 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -769,11 +769,11 @@ namespace JiShe.CollectBus.Common.Helpers /// /// /// - public static decimal GetTaskMark(int afn, int fn, int pn, int msa) + public static string GetTaskMark(int afn, int fn, int pn, int msa) { var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}"; - return Convert.ToInt32(makstr) << 32 | msa; + return makstr;// Convert.ToInt32(makstr) << 32 | msa; } } } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index afe25da..88209c2 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -17,7 +17,7 @@ 后端服务 - +