using Amazon.Runtime.Internal.Transform; using DnsClient.Protocol; using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.LogRecord; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Dto; using JiShe.CollectBus.Protocol.Models; using JiShe.CollectBus.Repository.LogRecord; using Mapster; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Collections.Immutable; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.DataChannels { /// /// 数据通道管理服务 /// public class DataChannelManageService : IDataChannelManageService, ITransientDependency { private readonly ILogger _logger; private readonly IIoTDbProvider _dbProvider; private readonly IProducerService _producerService; private readonly KafkaOptionConfig _kafkaOptions; private readonly ServerApplicationOptions _applicationOptions; private readonly ILogRecordRepository _logRecordRepository; public DataChannelManageService( ILogger logger, IIoTDbProvider dbProvider, IProducerService producerService, IOptions kafkaOptions, IOptions applicationOptions, ILogRecordRepository logRecordRepository) { _logger = logger; _dbProvider = dbProvider; _producerService = producerService; _kafkaOptions = kafkaOptions.Value; _applicationOptions = applicationOptions.Value; _logRecordRepository= logRecordRepository; } /// /// 定时任务数据通道写入 /// /// public async Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, ValueTuple> dataItems) { await _telemetryPacketInfoWriter.WriteAsync(dataItems); } /// /// 定时任务数据入库和Kafka推送通道 /// public async Task ScheduledMeterTaskReadingAsync( ChannelReader>> telemetryPacketInfoReader) { const int BatchSize = 50000; const int EmptyWaitMilliseconds = 50; var timeout = TimeSpan.FromMilliseconds(50); var timer = Stopwatch.StartNew(); long timeoutMilliseconds = 0; var metadata = await _dbProvider.GetMetadata(); var timeoutStopwatch = Stopwatch.StartNew(); try { while (true) { var batch = new List>>(); var canRead = telemetryPacketInfoReader.Count; if (canRead <= 0) { if (timeoutMilliseconds > 0) { _logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 任务数据通道处理数据耗时{timeoutMilliseconds}毫秒"); } timeoutMilliseconds = 0; //无消息时短等待50毫秒 await Task.Delay(EmptyWaitMilliseconds); continue; } timer.Restart(); timeoutStopwatch.Restart(); try { // 异步批量读取数据 while (batch != null && batch.Count < BatchSize && timeoutStopwatch.Elapsed <= timeout) { try { if (telemetryPacketInfoReader.TryRead(out var dataItem)) { batch.Add(dataItem); } } catch (Exception) { throw; } } } catch (Exception) { throw; } if (batch.Count == 0) { await Task.Delay(EmptyWaitMilliseconds); continue; } // 按TopicName分组处理 var topicGroups = new Dictionary>(); foreach (var (topicName, records) in batch) { if (!topicGroups.TryGetValue(topicName, out var list)) { list = new List(); topicGroups[topicName] = list; } list.AddRange(records); } // 处理每个分组 foreach (var (topicName, records) in topicGroups) { try { // 批量写入数据库 await _dbProvider.GetSessionPool(true).BatchInsertAsync(metadata, records); // 限流推送Kafka await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: records, deviceIdSelector: data => data.DeviceId, processor: async (data, groupIndex) => await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) ); } catch (Exception ex) { _logger.LogError(ex, "数据通道处理主题 {TopicName} 数据时发生异常", topicName); } } batch.Clear(); timer.Stop(); timeoutStopwatch.Stop(); timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; } } catch (Exception ex) { _logger.LogCritical(ex, "定时任务处理发生致命错误"); throw; } } /// /// Kafka推送消息(增加重试机制和参数校验) /// protected async Task KafkaProducerIssuedMessageAction( string topicName, T taskRecord, int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } const int maxRetries = 3;//重试次数 for (int retry = 0; retry < maxRetries; retry++) { try { await _producerService.ProduceAsync(topicName, taskRecord, partition); return; } catch (Exception ex) { _logger.LogWarning(ex, "Kafka推送{topicName}主题分区{partition}重试中({Retry}/{MaxRetries})", topicName, partition, retry + 1, maxRetries); if (retry == maxRetries - 1) throw; await Task.Delay(1000 * (retry + 1)); } } } /// /// 日志保存 /// /// /// public async Task LogSaveAsync(ChannelReader channelReader) { const int BatchSize = 1000; const int EmptyWaitMilliseconds = 1000; var timeout = TimeSpan.FromSeconds(2); var timer = Stopwatch.StartNew(); long timeoutMilliseconds = 0; try { while (true) { var batch = new List(); var canRead = channelReader.Count; if (canRead <= 0) { if (timeoutMilliseconds > 0) { _logger.LogError($"{nameof(LogSaveAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); } timeoutMilliseconds = 0; //无消息时短等待1秒 await Task.Delay(EmptyWaitMilliseconds); continue; } timer.Restart(); var startTime = DateTime.Now; try { // 异步批量读取数据 while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout) { try { if (channelReader.TryRead(out var dataItem)) { batch.Add(dataItem); } } catch (Exception) { throw; } } } catch (Exception) { throw; } if (batch == null || batch.Count == 0) { await Task.Delay(EmptyWaitMilliseconds); continue; } try { // 按小时分组 var hourGroups = new Dictionary>(); DateTime? dateTime = null; List batchList = new List(); int index = 1; foreach (var item in batch) { var records = item.Adapt(); if (!records.ReceivedTime.HasValue) records.ReceivedTime = DateTime.Now; var curDateTime = new DateTime(records.ReceivedTime.Value.Year, records.ReceivedTime.Value.Month, records.ReceivedTime.Value.Hour, records.ReceivedTime.Value.Day, records.ReceivedTime.Value.Hour, 0, 0); if (!dateTime.HasValue || curDateTime != dateTime) { dateTime = curDateTime; if (batchList.Count > 0) { var immutableList = ImmutableList.CreateRange(batchList); hourGroups.Add(dateTime.Value, immutableList.ToList()); batchList.Clear(); } } batchList.Add(records); // 最后一批 if(index== batch.Count) { var immutableList = ImmutableList.CreateRange(batchList); hourGroups.Add(dateTime.Value, immutableList.ToList()); batchList.Clear(); } index++; } foreach (var (time, records) in hourGroups) { // 批量写入数据库 await _logRecordRepository.InsertManyAsync(records, time); } } catch (Exception ex) { _logger.LogError(ex, "数据通道处理日志数据时发生异常"); } batch.Clear(); timer.Stop(); timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; startTime = DateTime.Now; } } catch (Exception ex) { _logger.LogCritical(ex, "日志处理发生致命错误"); throw; } } } }