批量任务数据测试

This commit is contained in:
陈益 2025-04-17 22:07:58 +08:00
parent ac546dd029
commit 3fac4ebcbd

View File

@ -183,7 +183,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
deviceIdSelector: data => data.FocusAddress, deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) => processor: (data, groupIndex) =>
{ {
AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
} }
); );
@ -576,8 +576,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
{ {
Stopwatch stopwatch = new Stopwatch(); var stopwatch = Stopwatch.StartNew();
stopwatch.Start();
//获取缓存中的电表信息 //获取缓存中的电表信息
int timeDensity = 15; int timeDensity = 15;
@ -591,93 +590,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
MaxDegreeOfParallelism = recommendedThreads, MaxDegreeOfParallelism = recommendedThreads,
}; };
string taskBatch = "20250417155016"; string taskBatch = "20250417155016";
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
List<Task> tasks = new List<Task>();
for (int i = 0; i < _kafkaOptions.NumPartitions; i++)
{ {
Console.WriteLine($"15分钟采集电表数据:{groupIndex}"); var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}";
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
List<MeterReadingTelemetryPacketInfo> meterInfos = new List<MeterReadingTelemetryPacketInfo>(); var task = GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey);
decimal? cursor = null; tasks.Add(task);
string member = null; }
bool hasNext; await Task.WhenAll(tasks);
do
{
var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
redisCacheTelemetryPacketInfoHashKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items); stopwatch.Stop();
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
_= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex);
}
);
} while (hasNext);
});
//var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
//var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
//{
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
// return;
//}
////获取下发任务缓存数据
//Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
//if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
//{
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
// return;
//}
//List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
////将取出的缓存任务数据发送到Kafka消息队列中
//foreach (var focusItem in meterTaskInfos)
//{
// foreach (var ammerterItem in focusItem.Value)
// {
// var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
// {
// MessageHexString = ammerterItem.Value.IssuedMessageHexString,
// MessageId = ammerterItem.Value.IssuedMessageId,
// FocusAddress = ammerterItem.Value.FocusAddress,
// TimeDensity = timeDensity.ToString(),
// };
// //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
// _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
// //_ = _producerBus.Publish(tempMsg);
// meterTaskInfosList.Add(ammerterItem.Value);
// }
//}
//if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
//{
// await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
//}
//stopwatch.Stop();
//_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
} }
@ -915,7 +843,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param> /// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns> /// <returns></returns>
private async Task KafkaProducerIssuedMessage(string topicName, private async Task KafkaProducerIssuedMessage(string topicName,
MeterReadingTelemetryPacketInfo taskRecord,int partition) MeterReadingTelemetryPacketInfo taskRecord, int partition)
{ {
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{ {
@ -925,56 +853,38 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _producerService.ProduceAsync(topicName, partition, taskRecord); await _producerService.ProduceAsync(topicName, partition, taskRecord);
} }
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) private async Task GetTaskInfoListToKafka(
string redisCacheTelemetryPacketInfoHashKey,
string redisCacheTelemetryPacketInfoZSetScoresIndexKey)
{ {
var currentDateTime = DateTime.Now; List<MeterReadingTelemetryPacketInfo> meterInfos = new List<MeterReadingTelemetryPacketInfo>();
decimal? cursor = null;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType); string member = null;
bool hasNext;
//FreeRedisProvider.Instance.key() do
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{ {
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
return; redisCacheTelemetryPacketInfoHashKey,
} redisCacheTelemetryPacketInfoZSetScoresIndexKey,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
//获取下发任务缓存数据 meterInfos.AddRange(page.Items);
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType); cursor = page.HasNext ? page.NextScore : null;
if (meterTaskInfos == null || meterTaskInfos.Count <= 0) member = page.HasNext ? page.NextMember : null;
{ hasNext = page.HasNext;
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>(); await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
//将取出的缓存任务数据发送到Kafka消息队列中 deviceIdSelector: data => data.FocusAddress,
foreach (var focusItem in meterTaskInfos) processor: (data, groupIndex) =>
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{ {
MessageHexString = ammerterItem.Value.IssuedMessageHexString, _ = KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
MessageId = ammerterItem.Value.IssuedMessageId, }
FocusAddress = ammerterItem.Value.FocusAddress, );
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); } while (hasNext);
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
} }
#endregion #endregion