diff --git a/modules/JiShe.CollectBus.IoTDB/Attribute/EntityTypeAttribute.cs b/modules/JiShe.CollectBus.IoTDB/Attribute/EntityTypeAttribute.cs new file mode 100644 index 0000000..3610c00 --- /dev/null +++ b/modules/JiShe.CollectBus.IoTDB/Attribute/EntityTypeAttribute.cs @@ -0,0 +1,19 @@ +using JiShe.CollectBus.IoTDB.Enums; + +namespace JiShe.CollectBus.IoTDB.Attribute +{ + /// + /// IoTDB实体类型特性 + /// + [AttributeUsage(AttributeTargets.Class)] + public class EntityTypeAttribute : System.Attribute + { + public EntityTypeEnum EntityType { get; } + + + public EntityTypeAttribute(EntityTypeEnum entityType) + { + EntityType = entityType; + } + } +} diff --git a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs index cd99b00..f321fa0 100644 --- a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs +++ b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs @@ -17,7 +17,7 @@ namespace JiShe.CollectBus.IoTDB.Context } /// - /// 是否使用表模型存储, 默认false,使用tree模型存储 + /// 存储模型切换标识,是否使用table模型存储, 默认为false,标识tree模型存储 /// public bool UseTableSessionPool { get; set; } diff --git a/modules/JiShe.CollectBus.IoTDB/EnumInfo/EntityTypeEnum.cs b/modules/JiShe.CollectBus.IoTDB/EnumInfo/EntityTypeEnum.cs new file mode 100644 index 0000000..26c6645 --- /dev/null +++ b/modules/JiShe.CollectBus.IoTDB/EnumInfo/EntityTypeEnum.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDB.Enums +{ + /// + /// IoTDB实体类型枚举 + /// + public enum EntityTypeEnum + { + /// + /// 树模型 + /// + TreeModel = 1, + + /// + /// 表模型 + /// + TableModel = 2, + } +} diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs index 447f6ce..a093bb7 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs @@ -1,4 +1,5 @@ using Apache.IoTDB; +using JiShe.CollectBus.IoTDB.Enums; namespace JiShe.CollectBus.IoTDB.Provider { @@ -7,6 +8,11 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public class DeviceMetadata { + /// + /// IoTDB实体类型枚举 + /// + public EntityTypeEnum EntityType { get; set; } + /// /// 是否有单测量值 /// diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index 46ce091..b78dfa3 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -13,7 +13,7 @@ /// public static string GetDevicePath(T entity) where T : IoTEntity { - return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceId}`"; + return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 9e18ac8..468506a 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -1,4 +1,5 @@ -using System.Collections.Concurrent; +using System; +using System.Collections.Concurrent; using System.Reflection; using System.Text; using Apache.IoTDB; @@ -9,6 +10,7 @@ using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Logging; +using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IoTDB.Provider { @@ -139,66 +141,98 @@ namespace JiShe.CollectBus.IoTDB.Provider List tempColumnNames = new List(); tempColumnNames.AddRange(metadata.ColumnNames); - foreach (var entity in entities) + var entityTypeAttribute = typeof(T).GetCustomAttribute(); + + if (entityTypeAttribute == null) { - timestamps.Add(entity.Timestamps); - var rowValues = new List(); - foreach (var measurement in tempColumnNames) + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); + } + + if (metadata.EntityType != entityTypeAttribute.EntityType) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); + } + + if(metadata.EntityType == Enums.EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); + } + else if (metadata.EntityType == Enums.EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) + { + throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); + } + + + foreach (var entity in entities) { + timestamps.Add(entity.Timestamps); + var rowValues = new List(); - PropertyInfo propertyInfo = typeof(T).GetProperty(measurement); - if (propertyInfo == null) + foreach (var measurement in tempColumnNames) { - throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。"); - } - var value = propertyInfo.GetValue(entity); - if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && value != null)//表示当前对象是单测点模式 - { - Type tupleType = value.GetType(); - Type[] tupleArgs = tupleType.GetGenericArguments(); - Type item2Type = tupleArgs[1]; // T 的实际类型 - var item1 = tupleType.GetProperty("Item1")!.GetValue(value); - var item2 = tupleType.GetProperty("Item2")!.GetValue(value); - if (item1 == null || item2 == null) + PropertyInfo propertyInfo = typeof(T).GetProperty(measurement); + if (propertyInfo == null) { - throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。"); + throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。"); } - var indexOf = metadata.ColumnNames.IndexOf(measurement); - metadata.ColumnNames[indexOf] = (string)item1!; - - rowValues.Add(item2); - - } - else - { - if (value != null) + var value = propertyInfo.GetValue(entity); + if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && metadata.IsSingleMeasuring == true)//表示当前对象是单测点模式 { - rowValues.Add(value); + if (value != null) + { + Type tupleType = value.GetType(); + Type[] tupleArgs = tupleType.GetGenericArguments(); + Type item2Type = tupleArgs[1]; // T 的实际类型 + var item1 = tupleType.GetProperty("Item1")!.GetValue(value); + var item2 = tupleType.GetProperty("Item2")!.GetValue(value); + if (item1 == null || item2 == null) + { + throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。"); + } + + var indexOf = metadata.ColumnNames.IndexOf(measurement); + metadata.ColumnNames[indexOf] = (string)item1!; + + rowValues.Add(item2); + } + else + { + rowValues.Add(null); + } } else { - //填充默认数据值 - DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue); - rowValues.Add(defaultValue); + rowValues.Add(value); + + //if (value != null) + //{ + // rowValues.Add(value); + //} + //else + //{ + // ////填充默认数据值 + // //DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue); + + // rowValues.Add(null); + //} } + } - } + values.Add(rowValues); - values.Add(rowValues); - - if (!_runtimeContext.UseTableSessionPool)//树模型 - { - devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); + if (!_runtimeContext.UseTableSessionPool)//树模型 + { + devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); + } + else + { + devicePaths.Add(DevicePathBuilder.GetTableName()); + } } - else - { - devicePaths.Add(DevicePathBuilder.GetTableName()); - } - } if (devicePaths.Count > 1) { @@ -213,14 +247,16 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 构建tree模型的Tablet /// - /// - /// - /// - /// + /// 已解析的设备数据元数据 + /// 设备路径 + /// 数据集合 + /// 时间戳集合 /// private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List> values, List timestamps) { + //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可 + return new Tablet( devicePath, metadata.ColumnNames, @@ -233,16 +269,16 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 构建表模型的Tablet /// - /// - /// - /// - /// + /// 已解析的设备数据元数据 + /// 表名称 + /// 数据集合 + /// 时间戳集合 /// - private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string devicePath, + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List> values, List timestamps) { var tablet = new Tablet( - devicePath, + tableName, metadata.ColumnNames, metadata.ColumnCategories, metadata.DataTypes, @@ -393,33 +429,35 @@ namespace JiShe.CollectBus.IoTDB.Provider private DeviceMetadata GetMetadata() where T : IoTEntity { - var columns = CollectColumnMetadata(typeof(T)); - var metadata = BuildDeviceMetadata(columns); - - return _metadataCache.AddOrUpdate( - typeof(T), - addValueFactory: t => metadata, // 如果键不存在,用此值添加 - updateValueFactory: (t, existingValue) => + if (_runtimeContext.UseTableSessionPool)//表模型 + { + return _metadataCache.GetOrAdd(typeof(T), type => { - var columns = CollectColumnMetadata(t); - var metadata = BuildDeviceMetadata(columns); - - //对现有值 existingValue 进行修改,返回新值 - existingValue.ColumnNames = metadata.ColumnNames; - return existingValue; - } - ); + var columns = CollectColumnMetadata(type); + var metadata = BuildDeviceMetadata(columns); + return metadata; + }); + } + else + { + // 树模型 + var columns = CollectColumnMetadata(typeof(T)); + var metadata = BuildDeviceMetadata(columns); - //return _metadataCache.GetOrAdd(typeof(T), type => - //{ - // var columns = CollectColumnMetadata(type); - // var metadata = BuildDeviceMetadata(columns); - // //if (metadata.IsSingleMeasuring) - // //{ - // // _metadataCache.Remove(typeof(T)); - // //} - // return metadata; - //}); + return _metadataCache.AddOrUpdate( + typeof(T), + addValueFactory: t => metadata, // 如果键不存在,用此值添加 + updateValueFactory: (t, existingValue) => + { + var columns = CollectColumnMetadata(t); + var metadata = BuildDeviceMetadata(columns); + + //对现有值 existingValue 进行修改,返回新值 + existingValue.ColumnNames = metadata.ColumnNames; + return existingValue; + } + ); + } } /// @@ -483,9 +521,10 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 构建设备元数据 /// - /// + /// 待解析的类 + /// 已处理好的数据列 /// - private DeviceMetadata BuildDeviceMetadata(List columns) + private DeviceMetadata BuildDeviceMetadata(List columns) where T : IoTEntity { var metadata = new DeviceMetadata(); @@ -499,11 +538,20 @@ namespace JiShe.CollectBus.IoTDB.Provider var groupedColumns = columns .GroupBy(c => c.Category) .ToDictionary(g => g.Key, g => g.ToList()); - + ProcessCategory(groupedColumns, ColumnCategory.TAG, metadata); ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); + var entityTypeAttribute = typeof(T).GetCustomAttribute(); + + if (entityTypeAttribute == null) + { + throw new ArgumentException($"{nameof(BuildDeviceMetadata)} 构建设备元数据时 {nameof(IoTEntity)}的EntityType 没有指定,属于异常情况,-101"); + } + + metadata.EntityType = entityTypeAttribute.EntityType; + return metadata; } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs index 39a584d..fc965e6 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTEntity.cs @@ -11,29 +11,29 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 系统名称 /// [TAGColumn] - public string SystemName { get; set; } + public required string SystemName { get; set; } /// /// 项目编码 /// [TAGColumn] - public string ProjectCode { get; set; } + public required string ProjectCode { get; set; } /// /// 设备类型集中器、电表、水表、流量计、传感器等 /// [TAGColumn] - public string DeviceType { get; set; } + public required string DeviceType { get; set; } /// /// 设备ID /// [TAGColumn] - public string DeviceId { get; set; } + public required string DeviceId { get; set; } /// /// 当前时间戳,单位毫秒 /// - public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } } diff --git a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index 1a056a7..625c1e6 100644 --- a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -42,6 +42,8 @@ namespace JiShe.CollectBus.RedisDataCache Instance = _freeRedisProvider.Instance; } + //todo 单个数据查询 + /// /// 单个添加数据 /// diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 23cfdfb..4970b7f 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -105,6 +105,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS await _iotDBProvider.InsertAsync(meter2); _dbContext.UseTableSessionPool = true; + var testTime = Convert.ToDateTime("2025-04-21 08:32:55"); ElectricityMeter meter = new ElectricityMeter() { @@ -115,7 +116,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS MeterModel = "DDZY-1980", ProjectCode = "10059", Voltage = 10, - Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), }; await _iotDBProvider.InsertAsync(meter); } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index cd06eea..3c2b652 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -27,8 +27,6 @@ using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; -using static FreeSql.Internal.GlobalFilter; -using static System.Runtime.InteropServices.JavaScript.JSType; namespace JiShe.CollectBus.ScheduledMeterReading { diff --git a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs index 2d9cc67..a6c09dc 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs +++ b/services/JiShe.CollectBus.Domain/Ammeters/ElectricityMeter.cs @@ -33,5 +33,8 @@ namespace JiShe.CollectBus.Ammeters [FIELDColumn] public double Power => Voltage * Current; + + [FIELDColumn] + public double? Currentd { get; set; } } }