From eaf6f22bdc8404b317acb3d500d1b47c26c3dcf2 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 12 May 2025 15:14:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DIoTDB=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=98=A0=E5=B0=84=E7=B1=BB=E5=9E=8B=E5=BC=82=E5=B8=B8=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JiShe.CollectBus.IoTDB/Model/Class1.cs | 12 -- .../Provider/DeviceMetadata.cs | 19 ++- .../Provider/IoTDBProvider.cs | 143 ++++++++++-------- .../Samples/SampleAppService.cs | 88 ++++++++++- .../BasicScheduledMeterReadingService.cs | 2 +- .../IotSystems/Devices/DeviceDataInfo.cs | 22 +++ 6 files changed, 200 insertions(+), 86 deletions(-) delete mode 100644 modules/JiShe.CollectBus.IoTDB/Model/Class1.cs create mode 100644 services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceDataInfo.cs diff --git a/modules/JiShe.CollectBus.IoTDB/Model/Class1.cs b/modules/JiShe.CollectBus.IoTDB/Model/Class1.cs deleted file mode 100644 index 9bcd5ff..0000000 --- a/modules/JiShe.CollectBus.IoTDB/Model/Class1.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.IoTDB.Model -{ - internal class Class1 - { - } -} diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs index 92c33ee..a93ba35 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DeviceMetadata.cs @@ -60,14 +60,29 @@ namespace JiShe.CollectBus.IoTDB.Provider public string ColumnName; /// - /// 值获取委托 + /// 数据类型 + /// + public TSDataType TSDataType { get; set;} + + /// + /// 值获取委托(参数:实体对象) /// public Func ValueGetter; + /// + /// 值设置委托(参数:实体对象,新值) + /// + public Action ValueSetter; + /// /// 类型转换委托 /// - public Func Converter; + public Func GetConverter; + + /// + /// 类型转换委托 + /// + public Func SetConverter; /// /// 是否单测点 diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 6d77ce3..88c6b92 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -274,7 +274,6 @@ namespace JiShe.CollectBus.IoTDB.Provider var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - _logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。"); var result = new BusPagedResult { TotalCount = await GetTotalCount(options), @@ -284,7 +283,7 @@ namespace JiShe.CollectBus.IoTDB.Provider }; stopwatch2.Stop(); - _logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。"); + //int totalPageCount = (int)Math.Ceiling((double)result.TotalCount / options.PageSize); if (result.Items.Count() < result.PageSize) @@ -467,7 +466,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private async Task BuildQuerySQL(IoTDBQueryOptions options) where T : IoTEntity { var metadata = await GetMetadata(); - var sb = new StringBuilder("SELECT TIME as Timestamps,"); + var sb = new StringBuilder("SELECT "); sb.AppendJoin(", ", metadata.ColumnNames); sb.Append($" FROM {options.TableNameOrTreePath}"); @@ -577,12 +576,7 @@ namespace JiShe.CollectBus.IoTDB.Provider var accessor = SourceEntityAccessorFactory.GetAccessor(); var memberCache = BuildMemberCache(accessor); - - var columns = new List() { "Timestamps" }; - var dataTypes = new List() { TSDataType.TIMESTAMP }; - columns.AddRange(metadata.ColumnNames); - dataTypes.AddRange(metadata.DataTypes); - + while (dataSet.HasNext() && results.Count < pageSize) { @@ -592,30 +586,12 @@ namespace JiShe.CollectBus.IoTDB.Provider Timestamps = record.Timestamps }; - foreach (var measurement in columns) + for (int i = 0; i < metadata.Processors.Count; i++) { - int indexOf = columns.IndexOf(measurement); - var value = record.Values[indexOf]; - TSDataType tSDataType = dataTypes[indexOf]; - - if (!memberCache.TryGetValue(measurement, out var member) && !(value is System.DBNull)) - { - throw new Exception($"{nameof(ParseResults)} 解析查询结果 {accessor.EntityName} 属性赋值出现异常,没有找到{measurement}对应的 member信息"); - } - - dynamic tempValue = GetTSDataValue(tSDataType, value); - - if (measurement.ToLower().EndsWith("time")) - { - member.Setter(entity, TimestampHelper.ConvertToDateTime(tempValue, TimestampUnit.Nanoseconds)); - } - else - { - member.Setter(entity, tempValue); - } - + var value = record.Values[i]; + metadata.Processors[i].ValueSetter(entity, value); } - + results.Add(entity); } @@ -715,7 +691,9 @@ namespace JiShe.CollectBus.IoTDB.Provider { ColumnName = column.Name, IsSingleMeasuring = column.IsSingleMeasuring, - Converter = CreateConverter(column.DeclaredTypeName.ToUpper()) + GetConverter = GetterConverter(column.DeclaredTypeName.ToUpper()), + SetConverter = SetterConverter(column.Name.ToUpper()), + TSDataType = column.DataType, }; // 处理单测点 @@ -730,37 +708,7 @@ namespace JiShe.CollectBus.IoTDB.Provider object rawValue = item1Member.Getter(obj); string value = rawValue?.ToString(); - if (!string.IsNullOrWhiteSpace(value)) - { - // 规则1: 严格检查ASCII字母和数字(0-9, A-Z, a-z) - bool hasInvalidChars = value.Any(c => - !((c >= 'A' && c <= 'Z') || - (c >= 'a' && c <= 'z') || - (c >= '0' && c <= '9'))); - - // 规则2: 首字符不能是数字 - bool startsWithDigit = value[0] >= '0' && value[0] <= '9'; - - // 规则3: 全字符串不能都是数字 - bool allDigits = value.All(c => c >= '0' && c <= '9'); - - // 按优先级抛出具体异常 - if (hasInvalidChars) - { - throw new InvalidOperationException( - $"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字"); - } - else if (startsWithDigit) - { - throw new InvalidOperationException( - $"SingleMeasuring name '{value}' 不能以数字开头"); - } - else if (allDigits) - { - throw new InvalidOperationException( - $"SingleMeasuring name '{value}' 不能全为数字"); - } - } + ValidateSingleMeasuringName(value); return value; }; @@ -769,7 +717,7 @@ namespace JiShe.CollectBus.IoTDB.Provider .First(m => m.NameOrPath == $"{column.Name}.Item2"); processor.ValueGetter = (obj) => { object rawValue = item2Member.Getter(obj); - return processor.Converter(rawValue); + return processor.GetConverter(rawValue); }; } else @@ -778,7 +726,15 @@ namespace JiShe.CollectBus.IoTDB.Provider var member = accessor.MemberList.First(m => m.NameOrPath == column.Name); processor.ValueGetter = (obj) => { object rawValue = member.Getter(obj); - return processor.Converter(rawValue); + return processor.GetConverter(rawValue); + }; + + //对应的属性成员进行赋值 + processor.ValueSetter = (obj, value) => + { + dynamic tempValue = GetTSDataValue(processor.TSDataType, value); + var rawValue = processor.SetConverter(value); + member.Setter(obj, rawValue); }; } @@ -788,7 +744,52 @@ namespace JiShe.CollectBus.IoTDB.Provider return metadata; } - private Func CreateConverter(string declaredTypeName) + /// + /// 验证单测点名称格式 + /// + private void ValidateSingleMeasuringName(string value) + { + if (string.IsNullOrWhiteSpace(value)) + { + return; + } + + // 规则1: 严格检查ASCII字母和数字(0-9, A-Z, a-z) + bool hasInvalidChars = value.Any(c => + !((c >= 'A' && c <= 'Z') || + (c >= 'a' && c <= 'z') || + (c >= '0' && c <= '9'))); + + // 规则2: 首字符不能是数字 + bool startsWithDigit = value[0] >= '0' && value[0] <= '9'; + + // 规则3: 全字符串不能都是数字 + bool allDigits = value.All(c => c >= '0' && c <= '9'); + + // 按优先级抛出具体异常 + if (hasInvalidChars) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字"); + } + else if (startsWithDigit) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 不能以数字开头"); + } + else if (allDigits) + { + throw new InvalidOperationException( + $"SingleMeasuring name '{value}' 不能全为数字"); + } + } + + /// + /// 取值的处理器 + /// + /// + /// + private Func GetterConverter(string declaredTypeName) { return declaredTypeName switch { @@ -797,6 +798,16 @@ namespace JiShe.CollectBus.IoTDB.Provider }; } + /// + /// 设置值的处理 + /// + /// + /// + private Func SetterConverter(string columnName) => + columnName.ToLower().EndsWith("time") + ? value => new DateTime(Convert.ToInt64(value), DateTimeKind.Utc) + : value => value; + /// /// 处理不同列类型的逻辑 /// diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index ffbfadb..3035633 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Ammeters; +using FreeSql.Internal.CommonProvider; +using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; @@ -10,7 +11,10 @@ using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IotSystems.Ammeters; +using JiShe.CollectBus.IotSystems.Devices; +using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.PrepayModel; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; @@ -27,6 +31,7 @@ using System.Diagnostics; using System.Threading.Tasks; using TouchSocket.Core; using TouchSocket.Sockets; +using static IdentityModel.ClaimComparer; namespace JiShe.CollectBus.Samples; @@ -205,18 +210,63 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS public async Task TestTreeModelSingleMeasuringEntity2(string measuring, int value, DateTime time) { time = DateTime.Now; - var meter = new TreeModelSingleMeasuringEntity() + var meter = new TreeModelSingleMeasuringEntity() { SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), - SingleMeasuring = (measuring, value) + SingleMeasuring = (measuring, true) }; await _iotDBProvider.InsertAsync(meter); } + /// + /// 测试树模型单个测点数据项查询 + /// + /// + /// + [HttpGet] + public async Task TestTreeModelSingleMeasuringEntityQuery() + { + + var time = DateTime.Now; + + var meter = new TreeModelSingleMeasuringEntity() + { + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + ProjectId = "10059", + Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), + SingleMeasuring = ("measuring", true) + }; + + + + + QueryCondition conditions = new QueryCondition() + { + Field = "DeviceId", + Operator = "=", + Value = meter.DeviceId + }; + + + var query = new IoTDBQueryOptions() + { + TableNameOrTreePath = meter.DevicePath, + PageIndex = 1, + PageSize = 1, + Conditions = new List() { conditions }, + }; + + var pageResult = await _iotDBProvider.QueryAsync(query); + + await _iotDBProvider.InsertAsync(meter); + } + /// /// 测试表模型单个测点数据项 /// @@ -250,14 +300,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS { time = DateTime.Now; - var meter = new TableModelSingleMeasuringEntity() + var meter = new TableModelSingleMeasuringEntity() { SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), - SingleColumn = (measuring, true) + SingleColumn = (measuring, value) }; _dbContext.UseTableSessionPool = true; await _iotDBProvider.InsertAsync(meter); @@ -462,5 +512,33 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS // 测试不管是否上线都ACK return SubscribeAck.Success(); } + + + /// + /// 测试Redis批量读取10万条数据性能 + /// + /// + [HttpGet] + public async Task TestRedisCacheGetData(string scores) + { + var timeDensity = "15"; + string SystemType = "Energy"; + string ServerTagName = "JiSheCollectBus5"; + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + + var page = await _redisDataCacheService.GetSingleData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + "973219481:17", + pageSize: 1000, + lastScore: 100, + lastMember: "memberId", + descending: true + ); + + await Task.CompletedTask; + } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 7b407b4..c8826ba 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -322,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - //return; + return; // 创建取消令牌源 //var cts = new CancellationTokenSource(); diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceDataInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceDataInfo.cs new file mode 100644 index 0000000..a3a0e21 --- /dev/null +++ b/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceDataInfo.cs @@ -0,0 +1,22 @@ +using JiShe.CollectBus.Analyzers.Shared; +using JiShe.CollectBus.IoTDB.Attributes; +using JiShe.CollectBus.IoTDB.Model; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IotSystems.Devices +{ + /// + /// 设备树模型数据信息 + /// + [SourceAnalyzers(EntityTypeEnum.TableModel)] + public class DeviceTreeModelDataInfo: IoTEntity + { + + [FIELDColumn] + public bool xfdsa { get; set; } + } +}