修改代码

This commit is contained in:
ChenYi 2025-04-18 08:25:28 +08:00
parent 7308a3f03b
commit 18178e4d78

View File

@ -604,13 +604,41 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
MaxDegreeOfParallelism = recommendedThreads,
};
string taskBatch = "20250417155016";
Parallel.For(0, _kafkaOptions.NumPartitions, options, groupIndex =>
string taskBatch = "20250417171649";
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{
Console.WriteLine($"15分钟采集电表数据:{groupIndex}");
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
_= GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
// _= GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
decimal? cursor = null;
string member = null;
bool hasNext;
do
{
var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
redisCacheTelemetryPacketInfoHashKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: page.Items,
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
}
);
} while (hasNext);
});
stopwatch.Stop();