using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.LogRecord; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Dto; using JiShe.CollectBus.Protocol.Models; using Mapster; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Collections.Immutable; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.DataChannels { /// /// 数据通道管理服务 /// public class DataChannelManageService : IDataChannelManageService, ITransientDependency { private readonly ILogger _logger; private readonly IIoTDbProvider _dbProvider; private readonly IProducerService _producerService; private readonly KafkaOptionConfig _kafkaOptions; private readonly ServerApplicationOptions _applicationOptions; public DataChannelManageService( ILogger logger, IIoTDbProvider dbProvider, IProducerService producerService, IOptions kafkaOptions, IOptions applicationOptions) { _logger = logger; _dbProvider = dbProvider; _producerService = producerService; _kafkaOptions = kafkaOptions.Value; _applicationOptions = applicationOptions.Value; } /// /// 定时任务数据通道写入 /// /// public async Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, ValueTuple> dataItems) { await _telemetryPacketInfoWriter.WriteAsync(dataItems); } /// /// 定时任务数据入库和Kafka推送通道 /// public async Task ScheduledMeterTaskReadingAsync( ChannelReader>> telemetryPacketInfoReader) { const int BatchSize = 50000; const int EmptyWaitMilliseconds = 50; var timeout = TimeSpan.FromMilliseconds(50); var timer = Stopwatch.StartNew(); long timeoutMilliseconds = 0; var metadata = await _dbProvider.GetMetadata(); var timeoutStopwatch = Stopwatch.StartNew(); try { while (true) { var batch = new List>>(); var canRead = telemetryPacketInfoReader.Count; if (canRead <= 0) { if (timeoutMilliseconds > 0) { _logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 任务数据通道处理数据耗时{timeoutMilliseconds}毫秒"); } timeoutMilliseconds = 0; //无消息时短等待50毫秒 await Task.Delay(EmptyWaitMilliseconds); continue; } timer.Restart(); timeoutStopwatch.Restart(); try { // 异步批量读取数据 while (batch != null && batch.Count < BatchSize && timeoutStopwatch.Elapsed <= timeout) { try { if (telemetryPacketInfoReader.TryRead(out var dataItem)) { batch.Add(dataItem); } } catch (Exception) { throw; } } } catch (Exception) { throw; } if (batch.Count == 0) { await Task.Delay(EmptyWaitMilliseconds); continue; } // 按TopicName分组处理 var topicGroups = new Dictionary>(); foreach (var (topicName, records) in batch) { if (!topicGroups.TryGetValue(topicName, out var list)) { list = new List(); topicGroups[topicName] = list; } list.AddRange(records); } // 处理每个分组 foreach (var (topicName, records) in topicGroups) { try { // 批量写入数据库 await _dbProvider.GetSessionPool(true).BatchInsertAsync(metadata, records); // 限流推送Kafka await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: records, deviceIdSelector: data => data.DeviceId, processor: async (data, groupIndex) => await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) ); } catch (Exception ex) { _logger.LogError(ex, "数据通道处理主题 {TopicName} 数据时发生异常", topicName); } } batch.Clear(); timer.Stop(); timeoutStopwatch.Stop(); timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; } } catch (Exception ex) { _logger.LogCritical(ex, "定时任务处理发生致命错误"); throw; } } /// /// Kafka推送消息(增加重试机制和参数校验) /// protected async Task KafkaProducerIssuedMessageAction( string topicName, T taskRecord, int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } const int maxRetries = 3;//重试次数 for (int retry = 0; retry < maxRetries; retry++) { try { await _producerService.ProduceAsync(topicName, taskRecord, partition); return; } catch (Exception ex) { _logger.LogWarning(ex, "Kafka推送{topicName}主题分区{partition}重试中({Retry}/{MaxRetries})", topicName, partition, retry + 1, maxRetries); if (retry == maxRetries - 1) throw; await Task.Delay(1000 * (retry + 1)); } } } ///// ///// 日志保存 ///// ///// ///// //public async Task LogSaveAsync(ChannelReader channelReader) //{ // const int BatchSize = 1000; // const int EmptyWaitMilliseconds = 1000; // var timeout = TimeSpan.FromSeconds(2); // var timer = Stopwatch.StartNew(); // long timeoutMilliseconds = 0; // try // { // while (true) // { // var batch = new List(); // var canRead = channelReader.Count; // if (canRead <= 0) // { // if (timeoutMilliseconds > 0) // { // _logger.LogError($"{nameof(LogSaveAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); // } // timeoutMilliseconds = 0; // //无消息时短等待1秒 // await Task.Delay(EmptyWaitMilliseconds); // continue; // } // timer.Restart(); // var startTime = DateTime.Now; // try // { // // 异步批量读取数据 // while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout) // { // try // { // if (channelReader.TryRead(out var dataItem)) // { // batch.Add(dataItem); // } // } // catch (Exception) // { // throw; // } // } // } // catch (Exception) // { // throw; // } // if (batch == null || batch.Count == 0) // { // await Task.Delay(EmptyWaitMilliseconds); // continue; // } // try // { // // 按小时分组 // var hourGroups = new Dictionary>(); // DateTime? dateTime = null; // List batchList = new List(); // int index = 1; // foreach (var item in batch) // { // var records = item.Adapt(); // if (!records.ReceivedTime.HasValue) // records.ReceivedTime = DateTime.Now; // var curDateTime = new DateTime(records.ReceivedTime.Value.Year, records.ReceivedTime.Value.Month, records.ReceivedTime.Value.Hour, records.ReceivedTime.Value.Day, records.ReceivedTime.Value.Hour, 0, 0); // if (!dateTime.HasValue || curDateTime != dateTime) // { // dateTime = curDateTime; // if (batchList.Count > 0) // { // var immutableList = ImmutableList.CreateRange(batchList); // hourGroups.Add(dateTime.Value, immutableList.ToList()); // batchList.Clear(); // } // } // batchList.Add(records); // // 最后一批 // if(index== batch.Count) // { // var immutableList = ImmutableList.CreateRange(batchList); // hourGroups.Add(dateTime.Value, immutableList.ToList()); // batchList.Clear(); // } // index++; // } // foreach (var (time, records) in hourGroups) // { // // 批量写入数据库 // await _logRecordRepository.InsertManyAsync(records, time); // } // } // catch (Exception ex) // { // _logger.LogError(ex, "数据通道处理日志数据时发生异常"); // } // batch.Clear(); // timer.Stop(); // timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; // startTime = DateTime.Now; // } // } // catch (Exception ex) // { // _logger.LogCritical(ex, "日志处理发生致命错误"); // throw; // } //} } }