From fe51402b1f105dc536f08ec2249d4ab13fa8f172 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Fri, 18 Apr 2025 17:46:24 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E7=94=B5=E8=A1=A8=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E6=8A=84=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../KafkaOptionConfig.cs | 5 + .../Samples/SampleAppService.cs | 14 + .../BasicScheduledMeterReadingService.cs | 607 +++++++++--------- .../Workers/CreateToBeIssueTaskWorker.cs | 7 +- .../Workers/SubscriberFifteenMinuteWorker.cs | 6 +- .../Workers/SubscriberFiveMinuteWorker.cs | 6 +- .../Workers/SubscriberOneMinuteWorker.cs | 6 +- .../Ammeters/AmmeterInfo.cs | 7 +- .../IotSystems/Watermeter/WatermeterInfo.cs | 22 +- .../Extensions/DateTimeExtensions.cs | 33 + .../Models/DeviceCacheBasicModel.cs | 5 + .../Pages/Monitor.cshtml | 2 +- web/JiShe.CollectBus.Host/appsettings.json | 6 +- 13 files changed, 400 insertions(+), 326 deletions(-) diff --git a/modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs index e592ea2..241746b 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaOptionConfig.cs @@ -59,5 +59,10 @@ namespace JiShe.CollectBus.Kafka /// public string? SaslPassword { get; set; } + /// + /// 首次采集时间 + /// + public DateTime FirstCollectionTime { get; set; } + } } diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index f0aa541..23cfdfb 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -27,6 +27,7 @@ using System.Diagnostics; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; +using JiShe.CollectBus.Common.Extensions; namespace JiShe.CollectBus.Samples; @@ -243,6 +244,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS } + /// + /// 下一个采集时间点验证 + /// + /// + [HttpGet] + public async Task TestCalculateNextCollectionTime(string time, int timeDensity) + { + DateTime nextTaskTime = Convert.ToDateTime(time); + + return await Task.FromResult(nextTaskTime.CalculateNextCollectionTime(timeDensity)); + } + + public Task GetAsync() { return Task.FromResult( diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index a760936..cd06eea 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Ammeters; +using DnsClient.Protocol; +using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; @@ -27,6 +28,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using static FreeSql.Internal.GlobalFilter; +using static System.Runtime.InteropServices.JavaScript.JSType; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -101,6 +103,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } + var currentTime = DateTime.Now; + foreach (var item in taskInfos) { var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync(item); @@ -130,70 +134,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - var timer = Stopwatch.StartNew(); + //_ = AmmerterCreatePublishTask(timeDensity, $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}"); - //获取对应频率中的所有电表信息 - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - - List meterInfos = new List(); - decimal? cursor = null; - string member = null; - bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); - - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); - - if (meterInfos == null || meterInfos.Count <= 0) - { - timer.Stop(); - _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); - return; - } - //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); - - - //处理数据 - //await DeviceGroupBalanceControl.ProcessGenericListAsync( - // items: meterInfos, - // deviceIdSelector: data => data.FocusAddress, - // processor: (data, groupIndex) => - // { - // _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); - // } - //); - - - - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: meterInfos, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => + _ = CreateMeterPublishTask( + timeDensity: timeDensity, + taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + meterType: MeterTypeEnum.Ammeter, + taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => { - AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); - } - ); - - timer.Stop(); - _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + }); } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { //todo 水表任务创建待处理 //await WatermeterScheduledMeterReadingIssued(timeDensity, meterInfos); + + _ = CreateMeterPublishTask( + timeDensity: timeDensity, + taskBatch: $"{tasksToBeIssueModel.NextTaskTime:yyyyMMddHHmm00}", + meterType: MeterTypeEnum.Ammeter, + taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => + { + //AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + }); } else { @@ -205,7 +170,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 - tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity); + tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); } } @@ -229,24 +194,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { -#if DEBUG - //var timeDensity = "15"; - //string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}"; - ////获取缓存中的电表信息 - //var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*"; - - //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - //var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); - ////List focusAddressDataLista = new List(); - //List meterInfos = new List(); - //foreach (var item in tempMeterInfos) - //{ - // var tempData = item.Adapt(); - // tempData.FocusId = item.FocusID; - // tempData.MeterId = item.Id; - // meterInfos.Add(tempData); - // //focusAddressDataLista.Add(item.FocusAddress); - //} +#if DEBUG + return; @@ -258,23 +207,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading List meterInfos = new List(); List focusAddressDataLista = new List(); var timer1 = Stopwatch.StartNew(); - //decimal? cursor = null; - //string member = null; - //bool hasNext; - //do - //{ - // var page = await _redisDataCacheService.GetAllPagedDataOptimized( - // redisCacheMeterInfoHashKeyTemp, - // redisCacheMeterInfoZSetScoresIndexKeyTemp, - // pageSize: 1000, - // lastScore: cursor, - // lastMember: member); - - // meterInfos.AddRange(page.Items); - // cursor = page.HasNext ? page.NextScore : null; - // member = page.HasNext ? page.NextMember : null; - // hasNext = page.HasNext; - //} while (hasNext); var allIds = new HashSet(); decimal? score = null; @@ -329,6 +261,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + + //先处理采集频率任务缓存 + foreach (var item in meterInfoGroupByTimeDensity) + { + TasksToBeIssueModel nextTask = new TasksToBeIssueModel() + { + TimeDensity = item.Key, + NextTaskTime = _kafkaOptions.FirstCollectionTime.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + }; + + //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 + + var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key); + await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); + } + foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) { var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; @@ -403,25 +351,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading } ammeterInfos.Add(ammeter); - //keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter); } - //await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); } await _redisDataCacheService.BatchInsertDataAsync( redisCacheMeterInfoHashKey, redisCacheMeterInfoSetIndexKey, redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos); - - //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务 - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - { - TimeDensity = itemTimeDensity.Key, - NextTaskTime = DateTime.Now.AddMinutes(1) - }; - - var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key); - await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } //初始化设备组负载控制 @@ -446,63 +382,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task AmmeterScheduledMeterOneMinuteReading() { + //获取缓存中的电表信息 - int timeDensity = 1; + int timeDensity = 5; var currentTime = DateTime.Now; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) + // 自动计算最佳并发度 + int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); + + var options = new ParallelOptions { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } + MaxDegreeOfParallelism = recommendedThreads, + }; + var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) + Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - List meterTaskInfosList = new List(); + _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + }); - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - //_= _producerBus.Publish(tempMsg); - - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - //_dbProvider.SwitchSessionPool(true); - //await _dbProvider.InsertAsync(meterTaskInfosList); - - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime); - } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList); - //await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter); - - - _logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成"); + await Task.CompletedTask; } @@ -516,57 +418,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading int timeDensity = 5; var currentTime = DateTime.Now; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); - var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0) + // 自动计算最佳并发度 + int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); + + var options = new ParallelOptions { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } + MaxDegreeOfParallelism = recommendedThreads, + }; + var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fiveMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) + Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); - - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime); - } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList); - - ////缓存下一个时间的任务 - //await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter); - - _logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成"); + _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + }); } /// @@ -575,11 +442,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() { - var stopwatch = Stopwatch.StartNew(); - //获取缓存中的电表信息 int timeDensity = 15; - var currentDateTime = DateTime.Now; + var currentTime = DateTime.Now; // 自动计算最佳并发度 int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); @@ -588,33 +453,84 @@ namespace JiShe.CollectBus.ScheduledMeterReading { MaxDegreeOfParallelism = recommendedThreads, }; - string taskBatch = "20250417171649"; + var taskBatch = $"{currentTime:yyyyMMddHHmm00}"; + Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => { - Console.WriteLine($"15分钟采集电表数据:{groupIndex}"); var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - _ = GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + _ = CreateMeterKafkaTaskMessage(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); }); - stopwatch.Stop(); - - _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); - await Task.CompletedTask; } - /// - /// 电表创建发布任务 + /// 创建电表待发送的任务数据 /// /// 采集频率 - /// 集中器号hash分组的集中器集合数据 + /// 时间格式的任务批次名称 + /// + private async Task AmmerterCreatePublishTask(int timeDensity, string taskBatch) + { + var timer = Stopwatch.StartNew(); + + //获取对应频率中的所有电表信息 + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + + List meterInfos = new List(); + decimal? cursor = null; + string member = null; + bool hasNext; + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + } while (hasNext); + + if (meterInfos == null || meterInfos.Count <= 0) + { + timer.Stop(); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); + return; + } + + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: meterInfos, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => + { + AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, taskBatch); + } + ); + + timer.Stop(); + _logger.LogInformation($"{nameof(AmmerterCreatePublishTaskAction)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + } + + + /// + /// 创建电表待发送的任务数据 + /// + /// 采集频率 + /// 电表信息 /// 集中器所在分组 /// 时间格式的任务批次名称 /// - private void AmmerterCreatePublishTask(int timeDensity + private void AmmerterCreatePublishTaskAction(int timeDensity , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; @@ -793,14 +709,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading taskList.Add(meterReadingRecords); } - + if (taskList == null || taskList.Count() <= 0 || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); return; } @@ -827,56 +743,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading // taskList); } - /// - /// Kafka 推送消息 - /// - /// 主题名称 - /// 任务记录 - /// 对应分区,也就是集中器号所在的分组序号 - /// - private async Task KafkaProducerIssuedMessage(string topicName, - MeterReadingTelemetryPacketInfo taskRecord, int partition) - { - if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) - { - throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101"); - } - - await _producerService.ProduceAsync(topicName, partition, taskRecord); - } - - private async Task GetTaskInfoListToKafka( - string redisCacheTelemetryPacketInfoHashKey, - string redisCacheTelemetryPacketInfoZSetScoresIndexKey) - { - decimal? cursor = null; - string member = null; - bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheTelemetryPacketInfoHashKey, - redisCacheTelemetryPacketInfoZSetScoresIndexKey, - pageSize: 1000, - lastScore: cursor, - lastMember: member); - - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: page.Items, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => - { - _ = KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); - } - ); - - } while (hasNext); - } - #endregion @@ -945,6 +811,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } + + _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成"); } @@ -1010,12 +878,58 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); } + /// + /// 创建水表待发送的任务数据 + /// + /// 采集频率 + /// 水表信息 + /// 集中器所在分组 + /// 时间格式的任务批次名称 + /// + private void WatermeterCreatePublishTaskAction(int timeDensity + , WatermeterInfo meterInfo, int groupIndex, string taskBatch) + { + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + + + var currentTime = DateTime.Now; + var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); + + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + + + var taskInfo = new MeterReadingTelemetryPacketInfo() + { + Seq= null, + + }; + // + + Build188SendData.Build188WaterMeterReadingSendDataUnit(meterInfo.Address); + + using (var pipe = FreeRedisProvider.Instance.StartPipe()) + { + // 主数据存储Hash + pipe.HSet(redisCacheTelemetryPacketInfoHashKey, taskInfo.MemberId, taskInfo.Serialize()); + + // Set索引缓存 + pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, taskInfo.MemberId); + + // ZSET索引缓存Key + pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, taskInfo.ScoreValue, taskInfo.MemberId); + + pipe.EndPipe(); + } + + } + #endregion #region 公共处理方法 - /// /// 判断是否需要生成采集指令 /// @@ -1032,39 +946,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading return false; } - ///// - ///// 指定时间对比当前时间 - ///// - ///// - ///// - ///// - //private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0) - //{ - // if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令 - // return false; - // return true; - //} - - ///// - ///// 缓存下一个时间的任务 - ///// - ///// 采集频率 - ///// 表类型 - ///// - //private async Task CacheNextTaskData(int timeDensity, MeterTypeEnum meterType) - //{ - // //缓存下一个时间的任务 - // TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - // { - // TimeDensity = timeDensity, - // NextTask = DateTime.Now.AddMinutes(timeDensity) - // }; - - // var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, meterType, timeDensity); - // await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); - //} - - /// /// 获取缓存表计下发指令缓存key前缀 /// @@ -1076,6 +957,130 @@ namespace JiShe.CollectBus.ScheduledMeterReading return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*"; } + + /// + /// 创建表的待发送的任务数据 + /// + /// 采集频率 + /// 时间格式的任务批次名称 + /// 表类型 + /// 具体的创建任务的委托 + /// + private async Task CreateMeterPublishTask(int timeDensity, string taskBatch, MeterTypeEnum meterType, Action taskCreateAction) where T : DeviceCacheBasicModel + { + var timer = Stopwatch.StartNew(); + + //获取对应频率中的所有电表信息 + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}"; + + List meterInfos = new List(); + decimal? cursor = null; + string member = null; + bool hasNext; + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + } while (hasNext); + + if (meterInfos == null || meterInfos.Count <= 0) + { + timer.Stop(); + _logger.LogError($"{nameof(CreateMeterPublishTask)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); + return; + } + + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: meterInfos, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => + { + taskCreateAction(timeDensity, data, groupIndex, taskBatch); + } + ); + + timer.Stop(); + _logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); + } + + + /// + /// 创建Kafka消息 + /// + /// + /// + /// + private async Task CreateMeterKafkaTaskMessage( + string redisCacheTelemetryPacketInfoHashKey, + string redisCacheTelemetryPacketInfoZSetScoresIndexKey) + { + if (string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)) + { + throw new Exception($"{nameof(CreateMeterKafkaTaskMessage)} 创建Kafka消息失败,参数异常,-101"); + } + + decimal? cursor = null; + string member = null; + bool hasNext; + var stopwatch = Stopwatch.StartNew(); + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheTelemetryPacketInfoHashKey, + redisCacheTelemetryPacketInfoZSetScoresIndexKey, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: page.Items, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => + { + _ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + } + ); + + } while (hasNext); + + stopwatch.Stop(); + _logger.LogError($"{nameof(CreateMeterKafkaTaskMessage)} {redisCacheTelemetryPacketInfoHashKey}采集推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + } + + /// + /// Kafka 推送消息 + /// + /// 主题名称 + /// 任务记录 + /// 对应分区,也就是集中器号所在的分组序号 + /// + private async Task KafkaProducerIssuedMessageAction(string topicName, + MeterReadingTelemetryPacketInfo taskRecord, int partition) + { + if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) + { + throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); + } + + await _producerService.ProduceAsync(topicName, partition, taskRecord); + } + #endregion } diff --git a/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs index b03193a..654abc4 100644 --- a/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Hangfire; using JiShe.CollectBus.Common.Consts; @@ -27,13 +28,15 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(CreateToBeIssueTaskWorker); - CronExpression = "* 10 * * * *"; + CronExpression = "0 0/1 * * * *"; + TimeZone = TimeZoneInfo.Local; this._scheduledMeterReadingService = scheduledMeterReadingService; } public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) { + _logger.LogError($"{DateTime.Now}"); // await _scheduledMeterReadingService.CreateToBeIssueTasks(); } } diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs index 441b22a..005d46b 100644 --- a/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Hangfire; using JiShe.CollectBus.ScheduledMeterReading; @@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberFifteenMinuteWorker); - CronExpression = "* 15 * * * *"; + CronExpression = "0 0/15 * * * *"; + TimeZone = TimeZoneInfo.Local; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs index 0a61c63..fbd3668 100644 --- a/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Hangfire; using JiShe.CollectBus.ScheduledMeterReading; @@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberFiveMinuteWorker); - CronExpression = "* 5 * * * *"; + CronExpression = "0 0/5 * * * *"; + TimeZone = TimeZoneInfo.Local; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs b/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs index 8b7cbfd..e9e0240 100644 --- a/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Hangfire; using JiShe.CollectBus.ScheduledMeterReading; @@ -26,7 +27,8 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(SubscriberOneMinuteWorker); - CronExpression = "* 1 * * * *"; + CronExpression = "0 0/1 * * * *"; + TimeZone = TimeZoneInfo.Local; this._scheduledMeterReadingService = scheduledMeterReadingService; } diff --git a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs index 8b082bb..c07950f 100644 --- a/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/services/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs @@ -26,12 +26,7 @@ namespace JiShe.CollectBus.Ammeters /// 电表名称 /// public string Name { get; set; } - - /// - /// 集中器地址 - /// - public string FocusAddress { get; set; } - + /// /// 集中器地址 /// diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs index 966192b..be97769 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Common.Enums; +using FreeSql.DataAnnotations; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; using System; using System.Collections.Generic; @@ -12,7 +13,19 @@ namespace JiShe.CollectBus.IotSystems.Watermeter /// 水表信息 /// public class WatermeterInfo: DeviceCacheBasicModel - { + { + /// + /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 + /// + [Column(IsIgnore = true)] + public override string MemberId => $"{FocusId}:{MeterId}"; + + /// + /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 + /// + [Column(IsIgnore = true)] + public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks; + /// /// 水表名称 /// @@ -21,11 +34,6 @@ namespace JiShe.CollectBus.IotSystems.Watermeter /// 表密码 /// public string Password { get; set; } - - /// - /// 集中器地址 - /// - public string FocusAddress { get; set; } /// /// 一个集中器下的[MeteringCode]必须唯一。 PN diff --git a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index 4e3fef9..e6136df 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -200,5 +200,38 @@ namespace JiShe.CollectBus.Common.Extensions { return DateTimeOffset.FromUnixTimeMilliseconds(millis).DateTime; } + + /// + /// 采集时间节点计算 + /// + /// 待采集时间 + /// + /// + public static DateTime CalculateNextCollectionTime(this DateTime referenceTime, int interval) + { + // 计算精确到分钟的基准时间 + var baseTime = new DateTime( + referenceTime.Year, + referenceTime.Month, + referenceTime.Day, + referenceTime.Hour, + referenceTime.Minute, + 0); + + // 计算总分钟数和下一个间隔点 + int totalMinutes = baseTime.Hour * 60 + baseTime.Minute; + int nextTotalMinutes = ((totalMinutes / interval) + 1) * interval; + + // 处理跨天情况 + int daysToAdd = nextTotalMinutes / (24 * 60); + int remainingMinutes = nextTotalMinutes % (24 * 60); + int hours = remainingMinutes / 60; + int minutes = remainingMinutes % 60; + + return baseTime.Date + .AddDays(daysToAdd) + .AddHours(hours) + .AddMinutes(minutes); + } } } diff --git a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs index fd7e3c8..1edc46a 100644 --- a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs +++ b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs @@ -35,5 +35,10 @@ namespace JiShe.CollectBus.Common.Models /// 是否已处理 /// public virtual bool IsHandle { get; set; } = false; + + /// + /// 集中器地址 + /// + public string FocusAddress { get; set;} } } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index aaadf3f..afe25da 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -17,7 +17,7 @@ 后端服务 - +