From a6d970af19d6fbf2020447bc31ac6765862833eb Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 21 Apr 2025 22:57:49 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BC=98=E5=8C=96IoTDB=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E6=98=A0=E5=B0=84=EF=BC=8C=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=BE=AE=E7=A7=92=E5=92=8C=E7=BA=B3=E7=A7=92=E7=9A=84=E6=89=A9?= =?UTF-8?q?=E5=B1=95=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusIoTDBModule.cs | 4 +- .../Context/IoTDBRuntimeContext.cs | 5 +- .../JiShe.CollectBus.IoTDB/Model/IoTEntity.cs | 9 +- .../Provider/DevicePathBuilder.cs | 4 +- .../Provider/IoTDBProvider.cs | 114 +++-- .../Internal/KafkaOptionConfig.cs | 2 +- .../Samples/SampleAppService.cs | 22 +- .../BasicScheduledMeterReadingService.cs | 402 +++++++++--------- ...nergySystemScheduledMeterReadingService.cs | 8 +- .../MeterReadingTelemetryPacketInfo.cs | 102 +++-- .../Encrypt/EncryptUtil.cs | 65 +++ .../Extensions/DateTimeExtensions.cs | 38 +- .../Extensions/DateTimeOffsetExtensions.cs | 101 +++++ .../Helpers/CommonHelper.cs | 4 +- .../Pages/Monitor.cshtml | 2 +- web/JiShe.CollectBus.Host/appsettings.json | 3 +- 16 files changed, 547 insertions(+), 338 deletions(-) create mode 100644 shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs create mode 100644 shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs diff --git a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs index 2cce113..6d26bdc 100644 --- a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs +++ b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs @@ -16,7 +16,7 @@ public class CollectBusIoTDbModule : AbpModule var configuration = context.Services.GetConfiguration(); Configure(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); }); - // 注册上下文为Scoped - context.Services.AddScoped(); + //// 注册上下文为Scoped + //context.Services.AddScoped(); } } \ No newline at end of file diff --git a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs index afad488..ef68325 100644 --- a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs +++ b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs @@ -1,16 +1,17 @@ using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.IoTDB.Context { /// /// IoTDB SessionPool 运行时上下文 /// - public class IoTDbRuntimeContext + public class IoTDBRuntimeContext: IScopedDependency { private readonly bool _defaultValue; - public IoTDbRuntimeContext(IOptions options) + public IoTDBRuntimeContext(IOptions options) { _defaultValue = options.Value.UseTableSessionPoolByDefault; UseTableSessionPool = _defaultValue; diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index 73a90e0..f3cdbe9 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -17,7 +17,7 @@ namespace JiShe.CollectBus.IoTDB.Model /// 项目编码 /// [TAGColumn] - public required string ProjectCode { get; set; } + public required string ProjectId { get; set; } /// /// 设备类型集中器、电表、水表、流量计、传感器等 @@ -32,8 +32,13 @@ namespace JiShe.CollectBus.IoTDB.Model public required string DeviceId { get; set; } /// - /// 当前时间戳,单位毫秒,必须通过DateTimeOffset获取 + /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + + /// + /// 数据创建时间戳,单位毫秒,必须通过DateTimeOffset获取 + /// + public virtual long CreationTime { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index f9127db..6a1a596 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDevicePath(T entity) where T : IoTEntity { - return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; + return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } @@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDeviceTableName(T entity) where T : IoTEntity { - return $"{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; + return $"{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 9b76a6f..b9c2cf0 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -24,7 +24,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private static readonly ConcurrentDictionary MetadataCache = new(); private readonly ILogger _logger; private readonly IIoTDbSessionFactory _sessionFactory; - private readonly IoTDbRuntimeContext _runtimeContext; + private readonly IoTDBRuntimeContext _runtimeContext; private IIoTDbSessionPool CurrentSession => _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); @@ -38,7 +38,7 @@ namespace JiShe.CollectBus.IoTDB.Provider public IoTDbProvider( ILogger logger, IIoTDbSessionFactory sessionFactory, - IoTDbRuntimeContext runtimeContext) + IoTDBRuntimeContext runtimeContext) { _logger = logger; _sessionFactory = sessionFactory; @@ -54,11 +54,19 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task InsertAsync(T entity) where T : IoTEntity { - var metadata = GetMetadata(); + try + { + var metadata = GetMetadata(); - var tablet = BuildTablet(new[] { entity }, metadata); + var tablet = BuildTablet(new[] { entity }, metadata); - await CurrentSession.InsertAsync(tablet); + await CurrentSession.InsertAsync(tablet); + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(InsertAsync)} 插入数据时发生异常"); + throw; + } } /// @@ -68,15 +76,23 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity { - var metadata = GetMetadata(); - - var batchSize = 1000; - var batches = entities.Chunk(batchSize); - - foreach (var batch in batches) + try { - var tablet = BuildTablet(batch, metadata); - await CurrentSession.InsertAsync(tablet); + var metadata = GetMetadata(); + + var batchSize = 1000; + var batches = entities.Chunk(batchSize); + + foreach (var batch in batches) + { + var tablet = BuildTablet(batch, metadata); + await CurrentSession.InsertAsync(tablet); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常"); + throw; } } @@ -89,18 +105,26 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task DeleteAsync(QueryOptions options) where T : IoTEntity { - var query = BuildDeleteSQL(options); - var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - - if (!sessionDataSet.HasNext()) + try { - _logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。"); - return 0; - } + var query = BuildDeleteSQL(options); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - //获取唯一结果行 - var row = sessionDataSet.Next(); - return row.Values[0]; + if (!sessionDataSet.HasNext()) + { + _logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。"); + return 0; + } + + //获取唯一结果行 + var row = sessionDataSet.Next(); + return row.Values[0]; + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(DeleteAsync)} 删除数据时发生异常"); + throw; + } } /// @@ -111,16 +135,24 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { - var query = BuildQuerySQL(options); - var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - - var result = new BusPagedResult + try { - TotalCount = await GetTotalCount(options), - Items = ParseResults(sessionDataSet, options.PageSize) - }; + var query = BuildQuerySQL(options); + var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - return result; + var result = new BusPagedResult + { + TotalCount = await GetTotalCount(options), + Items = ParseResults(sessionDataSet, options.PageSize) + }; + + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, $"{nameof(QueryAsync)} 查询数据时发生异常"); + throw; + } } /// @@ -160,12 +192,12 @@ namespace JiShe.CollectBus.IoTDB.Provider } string tableNameOrTreePath = string.Empty; - var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); + var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); if (tableNameOrTreePathAttribute != null) { - tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; + tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; } - + foreach (var entity in entities) { timestamps.Add(entity.Timestamps); @@ -214,7 +246,7 @@ namespace JiShe.CollectBus.IoTDB.Provider else { - rowValues.Add(value); + rowValues.Add(value); } } @@ -237,7 +269,7 @@ namespace JiShe.CollectBus.IoTDB.Provider devicePaths.Add(DevicePathBuilder.GetTableName()); } } - + } if (devicePaths.Count > 1) @@ -258,8 +290,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 数据集合 /// 时间戳集合 /// - private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, - List> values, List timestamps) + private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List> values, List timestamps) { //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可 @@ -280,8 +311,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// 数据集合 /// 时间戳集合 /// - private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, - List> values, List timestamps) + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List> values, List timestamps) { var tablet = new Tablet( tableName, @@ -639,7 +669,7 @@ namespace JiShe.CollectBus.IoTDB.Provider ["DOUBLE"] = TSDataType.DOUBLE, ["TEXT"] = TSDataType.TEXT, ["NULLTYPE"] = TSDataType.NONE, - ["TIMESTAMP"] = TSDataType.TIMESTAMP, + ["DATETIME"] = TSDataType.TIMESTAMP, ["DATE"] = TSDataType.DATE, ["BLOB"] = TSDataType.BLOB, ["DECIMAL"] = TSDataType.STRING, @@ -659,7 +689,7 @@ namespace JiShe.CollectBus.IoTDB.Provider ["DOUBLE"] = 0.0d, ["TEXT"] = string.Empty, ["NULLTYPE"] = null, - ["TIMESTAMP"] = null, + ["DATETIME"] = null, ["DATE"] = null, ["BLOB"] = null, ["DECIMAL"] = "0.0", diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index 5f7bdf1..3bea5f1 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -57,5 +57,5 @@ public class KafkaOptionConfig /// /// 首次采集时间 /// - public DateTime FirstCollectionTime { get; set; } + public DateTime? FirstCollectionTime { get; set; } } \ No newline at end of file diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 7693636..b616544 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -30,12 +30,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS { private readonly ILogger _logger; private readonly IIoTDbProvider _iotDBProvider; - private readonly IoTDbRuntimeContext _dbContext; + private readonly IoTDBRuntimeContext _dbContext; private readonly IoTDbOptions _options; private readonly IRedisDataCacheService _redisDataCacheService; public SampleAppService(IIoTDbProvider iotDBProvider, IOptions options, - IoTDbRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) + IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) { _iotDBProvider = iotDBProvider; _options = options.Value; @@ -60,7 +60,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, IssuedMessageHexString = "messageHexString", Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), @@ -83,7 +83,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), }; @@ -99,7 +99,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), }; @@ -122,7 +122,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, IssuedMessageHexString = "dsdfsfd", Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(), @@ -141,7 +141,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS DeviceType = "Ammeter", Current = 10, MeterModel = "DDZY-1980", - ProjectCode = "10059", + ProjectId = "10059", Voltage = 10, Currentd = 22, IssuedMessageHexString = "dsdfsfd", @@ -163,7 +163,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleMeasuring = new Tuple(measuring, value) }; @@ -183,7 +183,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleMeasuring = new Tuple(measuring, value) }; @@ -203,7 +203,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleColumn = new Tuple(measuring, value) }; @@ -224,7 +224,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS SystemName = "energy", DeviceId = "402440506", DeviceType = "Ammeter", - ProjectCode = "10059", + ProjectId = "10059", Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), SingleColumn = new Tuple(measuring, value) }; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 4ff5dd6..0fb4c49 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,6 @@ -using DnsClient.Protocol; +using Confluent.Kafka; +using DnsClient.Protocol; +using FreeSql; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.BuildSendDatas; @@ -9,6 +11,7 @@ using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; @@ -19,6 +22,8 @@ using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.Repository.MeterReadingRecord; using Mapster; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; @@ -38,22 +43,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading { private readonly ILogger _logger; private readonly IIoTDbProvider _dbProvider; - private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IProducerService _producerService; private readonly IRedisDataCacheService _redisDataCacheService; private readonly KafkaOptionConfig _kafkaOptions; + private readonly IoTDBRuntimeContext _runtimeContext; public BasicScheduledMeterReadingService( ILogger logger, - IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IRedisDataCacheService redisDataCacheService, IIoTDbProvider dbProvider, + IoTDBRuntimeContext runtimeContext, IOptions kafkaOptions) { _logger = logger; _dbProvider = dbProvider; - _meterReadingRecordRepository = meterReadingRecordRepository; + _runtimeContext = runtimeContext; _producerService = producerService; _redisDataCacheService = redisDataCacheService; _kafkaOptions = kafkaOptions.Value; @@ -133,17 +138,49 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - //_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}"); + List pushTaskInfos = new(); - _ = CreateMeterPublishTask( + await CreateMeterPublishTask( timeDensity: timeDensity, - taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity), meterType: MeterTypeEnum.Ammeter, - taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => + taskCreateAction: (timeDensity, data, groupIndex, timestamps) => { - AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); + if (tempTask == null || tempTask.Count <= 0) + { + return; + } + + pushTaskInfos.AddRange(tempTask); + + //using (var score = _serviceProvider.CreateScope()) + //{ + // var _dbContext = score.ServiceProvider.GetRequiredService(); + // _dbContext.UseTableSessionPool = true; + // _dbProvider.BatchInsertAsync(tempTask); + //} + + _runtimeContext.UseTableSessionPool = true; + _dbProvider.BatchInsertAsync(tempTask); }); + //if (pushTaskInfos.Count <= 0) + //{ + // _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有任务数据信息,-1051"); + // continue; + //} + + //using (var score = _serviceProvider.CreateScope()) + //{ + // var _dbContext = score.ServiceProvider.GetRequiredService(); + // _dbContext.UseTableSessionPool = true; + // _dbProvider.BatchInsertAsync(pushTaskInfos); + //} + + //_dbContext.UseTableSessionPool = true; + //await _dbProvider.BatchInsertAsync(pushTaskInfos); + } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { @@ -152,7 +189,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _ = CreateMeterPublishTask( timeDensity: timeDensity, - taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + nextTaskTime: tasksToBeIssueModel.NextTaskTime, meterType: MeterTypeEnum.Ammeter, taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => { @@ -193,11 +230,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { -#if DEBUG - return; - - - +#if DEBUG var timeDensity = "15"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; @@ -237,8 +270,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - //return; + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif @@ -261,13 +294,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + if (_kafkaOptions.FirstCollectionTime.HasValue == false) + { + _kafkaOptions.FirstCollectionTime = DateTime.Now; + } //先处理采集频率任务缓存 foreach (var item in meterInfoGroupByTimeDensity) { TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { TimeDensity = item.Key, - NextTaskTime = _kafkaOptions.FirstCollectionTime.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 }; //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 @@ -465,60 +502,60 @@ namespace JiShe.CollectBus.ScheduledMeterReading } - /// - /// 创建电表待发送的任务数据 - /// - /// 采集频率 - /// 时间格式的任务批次名称 - /// - private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) - { - var timer = Stopwatch.StartNew(); + ///// + ///// 创建电表待发送的任务数据 + ///// + ///// 采集频率 + ///// 时间格式的任务批次名称 + ///// + //private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) + //{ + // var timer = Stopwatch.StartNew(); - //获取对应频率中的所有电表信息 - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + // //获取对应频率中的所有电表信息 + // var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + // var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + // var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - List meterInfos = new List(); - decimal? cursor = null; - string member = null; - bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + // List meterInfos = new List(); + // decimal? cursor = null; + // string member = null; + // bool hasNext; + // do + // { + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + // } while (hasNext); - if (meterInfos == null || meterInfos.Count <= 0) - { - timer.Stop(); - _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); - return; - } + // if (meterInfos == null || meterInfos.Count <= 0) + // { + // timer.Stop(); + // _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); + // return; + // } - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: meterInfos, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => - { - AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); - } - ); + // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + // items: meterInfos, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, groupIndex) => + // { + // AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + // } + // ); - timer.Stop(); - _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); - } + // timer.Stop(); + // _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + //} /// @@ -527,38 +564,33 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 采集频率 /// 电表信息 /// 集中器所在分组 - /// 时间格式的任务批次名称 + /// 采集频率对应的时间戳 /// - private void AmmerterCreatePublishTaskAction(int timeDensity - , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch) + private List AmmerterCreatePublishTaskAction(int timeDensity + , AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) { + var currentTime = DateTime.Now; + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? - var currentTime = DateTime.Now; - var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); - return; + return null; } //载波的不处理 if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); - return; + return null; } if (ammeterInfo.State.Equals(2)) { //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); - return; + return null; } ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 @@ -571,22 +603,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) { // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); - return; + return null; } if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); - return; + return null; } if (Convert.ToInt32(ammeterInfo.Address) > 65535) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); - return; + return null; } if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) { //_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); - return; + return null; } List tempCodes = ammeterInfo.ItemCodes.Deserialize>()!; @@ -613,7 +645,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (tempSubCodes == null || tempSubCodes.Count <= 0) { //_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); - return; + return null; } else { @@ -683,18 +715,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading var meterReadingRecords = new MeterReadingTelemetryPacketInfo() { - ProjectID = ammeterInfo.ProjectID, + SystemName = SystemType, + ProjectId = $"{ammeterInfo.ProjectID}", + DeviceType = $"{MeterTypeEnum.Ammeter}", + DeviceId = $"{ammeterInfo.MemberId}", + Timestamps = timestamps.GetDateTimeOffset().ToUnixTimeMilliseconds(), DatabaseBusiID = ammeterInfo.DatabaseBusiID, - PendingCopyReadTime = pendingCopyReadTime, + PendingCopyReadTime = timestamps, CreationTime = currentTime, MeterAddress = ammeterInfo.AmmerterAddress, - MeterId = ammeterInfo.MeterId, - MeterType = MeterTypeEnum.Ammeter, FocusAddress = ammeterInfo.FocusAddress, - FocusId = ammeterInfo.FocusId, - AFN = aFN, + AFN = (int)aFN, Fn = fn, - Seq = builderResponse.Seq, + //Seq = builderResponse.Seq, MSA = builderResponse.MSA, ItemCode = tempItem, TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA), @@ -709,37 +742,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading taskList.Add(meterReadingRecords); } - if (taskList == null - || taskList.Count() <= 0 - || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) - || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) - || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); - return; - } + return taskList; - using (var pipe = FreeRedisProvider.Instance.StartPipe()) - { - foreach (var item in taskList) - { - // 主数据存储Hash - pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize()); + //using (var score = _serviceProvider.CreateScope()) + //{ + // var _dbContext = score.ServiceProvider.GetRequiredService(); + // _dbContext.UseTableSessionPool = true; + // _dbProvider.BatchInsertAsync(taskList); + //} - // Set索引缓存 - pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId); - // ZSET索引缓存Key - pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId); - } - pipe.EndPipe(); - } - - //await _redisDataCacheService.BatchInsertDataAsync( - // redisCacheTelemetryPacketInfoHashKey, - // redisCacheTelemetryPacketInfoSetIndexKey, - // redisCacheTelemetryPacketInfoZSetScoresIndexKey, - // taskList); } #endregion @@ -864,7 +876,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } ////删除任务数据 @@ -877,52 +889,52 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); } - /// - /// 创建水表待发送的任务数据 - /// - /// 采集频率 - /// 水表信息 - /// 集中器所在分组 - /// 时间格式的任务批次名称 - /// - private void WatermeterCreatePublishTaskAction(int timeDensity - , WatermeterInfo meterInfo, int groupIndex, string taskBatch) - { - var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + ///// + ///// 创建水表待发送的任务数据 + ///// + ///// 采集频率 + ///// 水表信息 + ///// 集中器所在分组 + ///// 时间格式的任务批次名称 + ///// + //private void WatermeterCreatePublishTaskAction(int timeDensity + // , WatermeterInfo meterInfo, int groupIndex, string taskBatch) + //{ + // var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; - var currentTime = DateTime.Now; - var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); + // var currentTime = DateTime.Now; + // var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var taskInfo = new MeterReadingTelemetryPacketInfo() - { - Seq= null, - - }; - // + // var taskInfo = new MeterReadingTelemetryPacketInfo() + // { + // Seq= null, - Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); + // }; + // // - using (var pipe = FreeRedisProvider.Instance.StartPipe()) - { - // 主数据存储Hash - pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); + // Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); - // Set索引缓存 - pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); + // using (var pipe = FreeRedisProvider.Instance.StartPipe()) + // { + // // 主数据存储Hash + // pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); - // ZSET索引缓存Key - pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); + // // Set索引缓存 + // pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); - pipe.EndPipe(); - } + // // ZSET索引缓存Key + // pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); - } + // pipe.EndPipe(); + // } + + //} #endregion @@ -961,11 +973,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 创建表的待发送的任务数据 /// /// 采集频率 - /// 时间格式的任务批次名称 + /// 采集频率对应的任务时间戳 /// 表类型 /// 具体的创建任务的委托 /// - private async Task CreateMeterPublishTask(int timeDensity, string taskBatch, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel + private async Task CreateMeterPublishTask(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel { var timer = Stopwatch.StartNew(); @@ -978,20 +990,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading decimal? cursor = null; string member = null; bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + //do + //{ + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + //} while (hasNext); + + + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1, + lastScore: cursor, + lastMember: member); + meterInfos.AddRange(page.Items); if (meterInfos == null || meterInfos.Count <= 0) { @@ -1000,13 +1021,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.FocusAddress, processor: (data, groupIndex) => { - taskCreateAction(timeDensity, data, groupIndex, taskBatch); + taskCreateAction(timeDensity, data, groupIndex, nextTaskTime); } ); @@ -1034,29 +1054,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading string member = null; bool hasNext; var stopwatch = Stopwatch.StartNew(); - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheTelemetryPacketInfoHashKey, - redisCacheTelemetryPacketInfoZSetScoresIndexKey, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + //do + //{ + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheTelemetryPacketInfoHashKey, + // redisCacheTelemetryPacketInfoZSetScoresIndexKey, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: page.Items, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => - { - _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); - } - ); + // await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + // items: page.Items, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, groupIndex) => + // { + // _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + // } + // ); - } while (hasNext); + //} while (hasNext); stopwatch.Stop(); _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index b8fd08b..fe0746f 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -8,6 +8,7 @@ using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.GatherItem; +using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; @@ -35,18 +36,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService { string serverTagName = string.Empty; + public EnergySystemScheduledMeterReadingService( ILogger logger, IIoTDbProvider dbProvider, - IMeterReadingRecordRepository meterReadingRecordRepository, - IOptions kafkaOptions, + IOptions kafkaOptions, + IoTDBRuntimeContext runtimeContext, IProducerService producerService, IRedisDataCacheService redisDataCacheService) : base(logger, - meterReadingRecordRepository, producerService, redisDataCacheService, dbProvider, + runtimeContext, kafkaOptions) { serverTagName = kafkaOptions.Value.ServerTagName; diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs index 3aafa41..3bffbb8 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -1,5 +1,9 @@ -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Encrypt; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IoTDB.Attribute; +using JiShe.CollectBus.IoTDB.Enums; +using JiShe.CollectBus.IoTDB.Model; using System; using System.Collections.Generic; using System.Linq; @@ -13,78 +17,85 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// /// 抄读任务Redis缓存数据记录 /// - public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel + [EntityType(EntityTypeEnum.TableModel)] + public class MeterReadingTelemetryPacketInfo : IoTEntity { /// - /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 + /// 排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳、或者某一个固定的标识1 /// - public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}"; - - /// - /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 - /// - public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks; - - + [FIELDColumn] + public string ScoreValue + { + get + { + return $"{FocusAddress}.{TaskMark}".Md5Fun(); + } + } + /// /// 是否手动操作 /// + [FIELDColumn] public bool ManualOrNot { get; set; } /// /// 任务数据唯一标记 /// - public decimal TaskMark { get; set; } - - /// - /// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。 - /// - public long Timestamps { get; set; } + [FIELDColumn] + public string TaskMark { get; set; } /// /// 是否超时 /// + [FIELDColumn] public bool IsTimeout { get; set; } = false; /// /// 待抄读时间 /// + [FIELDColumn] public DateTime PendingCopyReadTime { get; set; } - - + + /// + /// 集中器Id + /// + [FIELDColumn] + public int FocusId { get; set; } + + /// + /// 表Id + /// + [FIELDColumn] + public int MeterId { get; set; } + /// /// 集中器地址 /// + [FIELDColumn] public string FocusAddress { get; set; } - + /// /// 表地址 /// + [FIELDColumn] public string MeterAddress { get; set; } - /// - /// 表类型 - /// - public MeterTypeEnum MeterType { get; set; } - - /// - /// 项目ID - /// - public int ProjectID { get; set; } - /// /// 数据库业务ID /// + [FIELDColumn] public int DatabaseBusiID { get; set; } /// /// AFN功能码 /// - public AFN AFN { get; set; } + [FIELDColumn] + public int AFN { get; set; } /// /// 抄读功能码 /// + [FIELDColumn] public int Fn { get; set; } /// @@ -95,66 +106,73 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords /// /// 采集项编码 /// - public string ItemCode { get; set;} + [FIELDColumn] + public string ItemCode { get; set; } - /// - /// 帧序列域SEQ - /// - public required Seq Seq { get; set; } + ///// + ///// 帧序列域SEQ + ///// + //public required Seq Seq { get; set; } /// /// 地址域A3的主站地址MSA /// + [FIELDColumn] public int MSA { get; set; } /// /// 是否发送 /// + [FIELDColumn] public bool IsSend { get; set; } /// /// 创建时间 /// + [FIELDColumn] public DateTime CreationTime { get; set; } /// /// 下发消息内容 /// + [FIELDColumn] public string IssuedMessageHexString { get; set; } /// /// 下发消息Id /// + [FIELDColumn] public string IssuedMessageId { get; set; } /// /// 消息上报内容 /// + [FIELDColumn] public string? ReceivedMessageHexString { get; set; } /// /// 消息上报时间 /// + [FIELDColumn] public DateTime? ReceivedTime { get; set; } /// /// 上报消息Id /// - public string ReceivedMessageId { get; set; } + [FIELDColumn] + public string ReceivedMessageId { get; set; } /// /// 上报报文解析备注,异常情况下才有 /// + [FIELDColumn] public string ReceivedRemark { get; set; } /// /// 是否已上报 /// + [FIELDColumn] public bool IsReceived { get; set; } - - //public void CreateDataId(Guid Id) - //{ - // this.Id = Id; - //} + } } diff --git a/shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs b/shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs new file mode 100644 index 0000000..72cdf41 --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Encrypt/EncryptUtil.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Encrypt +{ + /// + /// 各种加密辅助类 + /// + public static class EncryptUtil + { + #region MD5加密 + + /// + /// MD5加密 + /// + public static string Md5Fun(this string value) + { + if (value == null) + { + throw new ArgumentNullException("未将对象引用设置到对象的实例。"); + } + + var encoding = Encoding.UTF8; + MD5 md5 = MD5.Create(); + return HashAlgorithmBase(md5, value, encoding); + } + + /// + /// 加权MD5加密 + /// + public static string Md5Fun(this string value, string salt) + { + return salt == null ? value.Md5Fun() : (value + "『" + salt + "』").Md5Fun(); + } + + #endregion + + /// + /// HashAlgorithm 加密统一方法 + /// + private static string HashAlgorithmBase(HashAlgorithm hashAlgorithmObj, string source, Encoding encoding) + { + byte[] btStr = encoding.GetBytes(source); + byte[] hashStr = hashAlgorithmObj.ComputeHash(btStr); + return hashStr.Bytes2Str(); + } + + /// + /// 转换成字符串 + /// + private static string Bytes2Str(this IEnumerable source, string formatStr = "{0:X2}") + { + StringBuilder pwd = new StringBuilder(); + foreach (byte btStr in source) + { + pwd.AppendFormat(formatStr, btStr); + } + return pwd.ToString(); + } + } +} diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index 2bdcf6c..e72f4fd 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -181,25 +181,7 @@ namespace JiShe.CollectBus.Common.Extensions return $"{dateTime:yyyyMMddHH}"; #endif } - - /// - /// 获取当前时间毫秒级时间戳 - /// - /// - public static long GetCurrentTimeMillis() - { - return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - } - - /// - /// 将Unix时间戳转换为日期时间 - /// - /// - /// - public static DateTime FromUnixMillis(long millis) - { - return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; - } + /// /// 采集时间节点计算 @@ -233,22 +215,6 @@ namespace JiShe.CollectBus.Common.Extensions .AddHours(hours) .AddMinutes(minutes); } - - - /// - /// 将 DateTime 时间转换为 DateTimeOffset 时间 - /// - /// - /// - public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime) - { - //确保 Kind 为 Local(如果是 Unspecified) - DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified - ? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local) - : rawDateTime; - - // 转换为 DateTimeOffset(自动应用本地时区偏移) - return new DateTimeOffset(localDateTime); - } + } } diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs new file mode 100644 index 0000000..c73fbd0 --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeOffsetExtensions.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using System.ComponentModel; +using System.Linq; + +namespace JiShe.CollectBus.Common.Extensions +{ + public static class DateTimeOffsetExtensions + { + + /// + /// 获取当前时间毫秒级时间戳 + /// + /// + public static long GetCurrentTimeMillis() + { + return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } + + /// + /// 将Unix时间戳转换为日期时间 + /// + /// + /// + public static DateTime FromUnixMillis(long millis) + { + return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; + } + + /// + /// 采集时间节点计算 + /// + /// 待采集时间 + /// + /// + public static DateTime CalculateNextCollectionTime(this DateTime referenceTime, int interval) + { + // 计算精确到分钟的基准时间 + var baseTime = new DateTime( + referenceTime.Year, + referenceTime.Month, + referenceTime.Day, + referenceTime.Hour, + referenceTime.Minute, + 0); + + // 计算总分钟数和下一个间隔点 + int totalMinutes = baseTime.Hour * 60 + baseTime.Minute; + int nextTotalMinutes = ((totalMinutes / interval) + 1) * interval; + + // 处理跨天情况 + int daysToAdd = nextTotalMinutes / (24 * 60); + int remainingMinutes = nextTotalMinutes % (24 * 60); + int hours = remainingMinutes / 60; + int minutes = remainingMinutes % 60; + + return baseTime.Date + .AddDays(daysToAdd) + .AddHours(hours) + .AddMinutes(minutes); + } + + + /// + /// 将 DateTime 时间转换为 DateTimeOffset 时间 + /// + /// + /// + public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime) + { + //确保 Kind 为 Local(如果是 Unspecified) + DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified + ? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local) + : rawDateTime; + + // 转换为 DateTimeOffset(自动应用本地时区偏移) + return new DateTimeOffset(localDateTime); + } + + private static readonly long UnixEpochTicks = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero).Ticks; + + /// + /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的微秒数 + /// + public static long ToUnixTimeMicroseconds(this DateTimeOffset dateTimeOffset) + { + // Ticks 单位是 100 纳秒,转换为微秒需除以 10 + long elapsedTicks = dateTimeOffset.Ticks - UnixEpochTicks; + return elapsedTicks / 10; // 1 微秒 = 1000 纳秒 = 10 Ticks + } + + /// + /// 获取当前 DateTimeOffset 距离 Unix 纪元(1970-01-01)的纳秒数 + /// + public static long ToUnixTimeNanoseconds(this DateTimeOffset dateTimeOffset) + { + long nanoseconds = (dateTimeOffset.Ticks - UnixEpochTicks) * 100; + return nanoseconds; + } + } +} diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 3c36d23..34cf37f 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -769,11 +769,11 @@ namespace JiShe.CollectBus.Common.Helpers /// /// /// - public static decimal GetTaskMark(int afn, int fn, int pn, int msa) + public static string GetTaskMark(int afn, int fn, int pn, int msa) { var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}"; - return Convert.ToInt32(makstr) << 32 | msa; + return makstr;// Convert.ToInt32(makstr) << 32 | msa; } } } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index afe25da..88209c2 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -17,7 +17,7 @@ 后端服务 - +