From 18178e4d78b20cce642719e25d4d2c2e052c7e9f Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Fri, 18 Apr 2025 08:25:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BasicScheduledMeterReadingService.cs | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 1a4b0dd..75fda10 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -604,13 +604,41 @@ namespace JiShe.CollectBus.ScheduledMeterReading { MaxDegreeOfParallelism = recommendedThreads, }; - string taskBatch = "20250417155016"; - Parallel.For(0, _kafkaOptions.NumPartitions, options, groupIndex => + string taskBatch = "20250417171649"; + Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => { + Console.WriteLine($"15分钟采集电表数据:{groupIndex}"); var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - _= GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + // _= GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + + decimal? cursor = null; + string member = null; + bool hasNext; + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheTelemetryPacketInfoHashKey, + redisCacheTelemetryPacketInfoZSetScoresIndexKey, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: page.Items, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => + { + _ = KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + } + ); + + } while (hasNext); }); stopwatch.Stop();