diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 97bcc50..45b77b5 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -183,7 +183,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading deviceIdSelector: data => data.FocusAddress, processor: (data, groupIndex) => { - AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); + AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); } ); @@ -576,8 +576,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task AmmeterScheduledMeterFifteenMinuteReading() { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.Start(); + var stopwatch = Stopwatch.StartNew(); //获取缓存中的电表信息 int timeDensity = 15; @@ -591,96 +590,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading MaxDegreeOfParallelism = recommendedThreads, }; string taskBatch = "20250417155016"; - Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => + + List tasks = new List(); + for (int i = 0; i < _kafkaOptions.NumPartitions; i++) { - Console.WriteLine($"15分钟采集电表数据:{groupIndex}"); - var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; - var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; + var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, i, taskBatch)}"; - List meterInfos = new List(); - decimal? cursor = null; - string member = null; - bool hasNext; - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheTelemetryPacketInfoHashKey, - redisCacheTelemetryPacketInfoZSetScoresIndexKey, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + var task = GetTaskInfoListToKafka(redisCacheTelemetryPacketInfoHashKey, redisCacheTelemetryPacketInfoZSetScoresIndexKey); + tasks.Add(task); + } + await Task.WhenAll(tasks); + + stopwatch.Stop(); - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: meterInfos, - deviceIdSelector: data => data.FocusAddress, - processor: (data, groupIndex) => - { - _= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex); - } - ); - - } while (hasNext); - }); - - - - //var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); - //var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - //if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) - //{ - // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - // return; - //} - - ////获取下发任务缓存数据 - //Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); - //if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - //{ - // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - // return; - //} - - //List meterTaskInfosList = new List(); - - ////将取出的缓存任务数据发送到Kafka消息队列中 - //foreach (var focusItem in meterTaskInfos) - //{ - // foreach (var ammerterItem in focusItem.Value) - // { - // var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - // { - // MessageHexString = ammerterItem.Value.IssuedMessageHexString, - // MessageId = ammerterItem.Value.IssuedMessageId, - // FocusAddress = ammerterItem.Value.FocusAddress, - // TimeDensity = timeDensity.ToString(), - // }; - // //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - // _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - // //_ = _producerBus.Publish(tempMsg); - - // meterTaskInfosList.Add(ammerterItem.Value); - // } - //} - //if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - //{ - // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); - //} - - - //stopwatch.Stop(); - - //_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); } - + /// /// 电表创建发布任务 @@ -698,7 +626,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var currentTime = DateTime.Now; var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; @@ -915,7 +843,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 对应分区,也就是集中器号所在的分组序号 /// private async Task KafkaProducerIssuedMessage(string topicName, - MeterReadingTelemetryPacketInfo taskRecord,int partition) + MeterReadingTelemetryPacketInfo taskRecord, int partition) { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { @@ -925,58 +853,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading await _producerService.ProduceAsync(topicName, partition, taskRecord); } - private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) + private async Task GetTaskInfoListToKafka( + string redisCacheTelemetryPacketInfoHashKey, + string redisCacheTelemetryPacketInfoZSetScoresIndexKey) { - var currentDateTime = DateTime.Now; - - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType); - - //FreeRedisProvider.Instance.key() - - var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) + List meterInfos = new List(); + decimal? cursor = null; + string member = null; + bool hasNext; + do { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheTelemetryPacketInfoHashKey, + redisCacheTelemetryPacketInfoZSetScoresIndexKey, + pageSize: 1000, + lastScore: cursor, + lastMember: member); - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: meterInfos, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + _ = KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex); + } + ); - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); - } + } while (hasNext); } - + #endregion