修改代码

This commit is contained in:
ChenYi 2025-04-17 23:09:59 +08:00
parent 3fac4ebcbd
commit 7308a3f03b

View File

@ -581,6 +581,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息 //获取缓存中的电表信息
int timeDensity = 15; int timeDensity = 15;
var currentDateTime = DateTime.Now; var currentDateTime = DateTime.Now;
//string taskBatch = "20250417155016";
//List<Task> tasks = new List<Task>();
//for (int i = 0; i < _kafkaOptions.NumPartitions; i++)
//{
// var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}";
// var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}";
// var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}";
// await GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
// // tasks.Add(task);
//}
//await Task.WhenAll(tasks);
// 自动计算最佳并发度 // 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
@ -590,19 +605,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
MaxDegreeOfParallelism = recommendedThreads, MaxDegreeOfParallelism = recommendedThreads,
}; };
string taskBatch = "20250417155016"; string taskBatch = "20250417155016";
Parallel.For(0, _kafkaOptions.NumPartitions, options, groupIndex =>
List<Task> tasks = new List<Task>();
for (int i = 0; i < _kafkaOptions.NumPartitions; i++)
{ {
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}";
_= GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
});
var task = GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
tasks.Add(task);
}
await Task.WhenAll(tasks);
stopwatch.Stop(); stopwatch.Stop();
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
@ -857,7 +867,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
string redisCacheTelemetryPacketInfoHashKey, string redisCacheTelemetryPacketInfoHashKey,
string redisCacheTelemetryPacketInfoZSetScoresIndexKey) string redisCacheTelemetryPacketInfoZSetScoresIndexKey)
{ {
List<MeterReadingTelemetryPacketInfo> meterInfos = new List<MeterReadingTelemetryPacketInfo>();
decimal? cursor = null; decimal? cursor = null;
string member = null; string member = null;
bool hasNext; bool hasNext;
@ -870,13 +879,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
lastScore: cursor, lastScore: cursor,
lastMember: member); lastMember: member);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null; cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null; member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext; hasNext = page.HasNext;
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos, items: page.Items,
deviceIdSelector: data => data.FocusAddress, deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) => processor: (data, groupIndex) =>
{ {