using JiShe.CollectBus.Common; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Diagnostics; using System.Threading.Channels; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.DataChannels { /// /// 数据通道管理服务 /// public class DataChannelManageService : IDataChannelManageService, ITransientDependency { private readonly ILogger _logger; private readonly IoTDBSessionPoolProvider _dbProvider; private readonly IProducerService _producerService; private readonly KafkaOptionConfig _kafkaOptions; private readonly ServerApplicationOptions _applicationOptions; public DataChannelManageService( ILogger logger, IoTDBSessionPoolProvider dbProvider, IProducerService producerService, IOptions kafkaOptions, IOptions applicationOptions) { _logger = logger; _dbProvider = dbProvider; _producerService = producerService; _kafkaOptions = kafkaOptions.Value; _applicationOptions = applicationOptions.Value; } /// /// 定时任务数据通道写入 /// /// public async Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, ValueTuple> dataItems) { await _telemetryPacketInfoWriter.WriteAsync(dataItems); } /// /// 定时任务数据入库和Kafka推送通道 /// public async Task ScheduledMeterTaskReadingAsync( ChannelReader>> telemetryPacketInfoReader) { const int BatchSize = 50000; const int EmptyWaitMilliseconds = 50; var timeout = TimeSpan.FromMilliseconds(50); var timer = Stopwatch.StartNew(); long timeoutMilliseconds = 0; var metadata = await _dbProvider.GetMetadata(); var timeoutStopwatch = Stopwatch.StartNew(); try { while (true) { var batch = new List>>(); var canRead = telemetryPacketInfoReader.Count; if (canRead <= 0) { if (timeoutMilliseconds > 0) { _logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 任务数据通道处理数据耗时{timeoutMilliseconds}毫秒"); } timeoutMilliseconds = 0; //无消息时短等待50毫秒 await Task.Delay(EmptyWaitMilliseconds); continue; } timer.Restart(); timeoutStopwatch.Restart(); try { // 异步批量读取数据 while (batch != null && batch.Count < BatchSize && timeoutStopwatch.Elapsed <= timeout) { try { if (telemetryPacketInfoReader.TryRead(out var dataItem)) { batch.Add(dataItem); } } catch (Exception) { throw; } } } catch (Exception) { throw; } if (batch.Count == 0) { await Task.Delay(EmptyWaitMilliseconds); continue; } // 按TopicName分组处理 var topicGroups = new Dictionary>(); foreach (var (topicName, records) in batch) { if (!topicGroups.TryGetValue(topicName, out var list)) { list = new List(); topicGroups[topicName] = list; } list.AddRange(records); } // 处理每个分组 foreach (var (topicName, records) in topicGroups) { try { // 批量写入数据库 await _dbProvider.GetSessionPool(true).BatchInsertAsync(metadata, records); // 限流推送Kafka await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: records, deviceIdSelector: data => data.DeviceId, processor: async (data, groupIndex) => await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) ); } catch (Exception ex) { _logger.LogError(ex, "数据通道处理主题 {TopicName} 数据时发生异常", topicName); } } batch.Clear(); timer.Stop(); timeoutStopwatch.Stop(); timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; } } catch (Exception ex) { _logger.LogCritical(ex, "定时任务处理发生致命错误"); throw; } } /// /// Kafka推送消息(增加重试机制和参数校验) /// protected async Task KafkaProducerIssuedMessageAction( string topicName, T taskRecord, int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } const int maxRetries = 3;//重试次数 for (int retry = 0; retry < maxRetries; retry++) { try { await _producerService.ProduceAsync(topicName, taskRecord, partition); return; } catch (Exception ex) { _logger.LogWarning(ex, "Kafka推送{topicName}主题分区{partition}重试中({Retry}/{MaxRetries})", topicName, partition, retry + 1, maxRetries); if (retry == maxRetries - 1) throw; await Task.Delay(1000 * (retry + 1)); } } } } }