From 7308a3f03bf70b970fcb9e0d3f55bf039b74347b Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Thu, 17 Apr 2025 23:09:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BasicScheduledMeterReadingService.cs | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 45b77b5..1a4b0dd 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -581,6 +581,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取缓存中的电表信息 int timeDensity = 15; var currentDateTime = DateTime.Now; + + //string taskBatch = "20250417155016"; + + //List tasks = new List(); + //for (int i = 0; i < _kafkaOptions.NumPartitions; i++) + //{ + // var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; + // var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; + // var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; + + // await GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + // // tasks.Add(task); + //} + //await Task.WhenAll(tasks); + // 自动计算最佳并发度 int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); @@ -590,19 +605,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading MaxDegreeOfParallelism = recommendedThreads, }; string taskBatch = "20250417155016"; - - List tasks = new List(); - for (int i = 0; i < _kafkaOptions.NumPartitions; i++) + Parallel.For(0, _kafkaOptions.NumPartitions, options, groupIndex => { - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + + _= GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + }); - var task = GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); - tasks.Add(task); - } - await Task.WhenAll(tasks); - stopwatch.Stop(); _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); @@ -857,7 +867,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading string redisCacheTelemetryPacketInfoHashKey, string redisCacheTelemetryPacketInfoZSetScoresIndexKey) { - List meterInfos = new List(); decimal? cursor = null; string member = null; bool hasNext; @@ -870,13 +879,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading lastScore: cursor, lastMember: member); - meterInfos.AddRange(page.Items); cursor = page.HasNext ? page.NextScore : null; member = page.HasNext ? page.NextMember : null; hasNext = page.HasNext; await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: meterInfos, + items: page.Items, deviceIdSelector: data => data.FocusAddress, processor: (data, groupIndex) => {