using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.DataChannels { /// /// 数据通道管理服务 /// public class DataChannelManageService : IDataChannelManageService, ITransientDependency { private readonly ILogger _logger; private readonly IIoTDbProvider _dbProvider; private readonly IProducerService _producerService; private readonly KafkaOptionConfig _kafkaOptions; private readonly ServerApplicationOptions _applicationOptions; private readonly IoTDBRuntimeContext _runtimeContext; public DataChannelManageService( ILogger logger, IIoTDbProvider dbProvider, IoTDBRuntimeContext runtimeContext, IProducerService producerService, IOptions kafkaOptions, IOptions applicationOptions) { _logger = logger; _dbProvider = dbProvider; _runtimeContext = runtimeContext; _producerService = producerService; _kafkaOptions = kafkaOptions.Value; _applicationOptions = applicationOptions.Value; _runtimeContext.UseTableSessionPool = true; } /// /// 定时任务数据通道写入 /// /// public async Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems) { await _telemetryPacketInfoWriter.WriteAsync(dataItems); } /// /// 定时任务数据入库和Kafka推送通道 /// public async Task ScheduledMeterTaskReadingAsync( ChannelReader>> telemetryPacketInfoReader, CancellationToken cancellationToken = default) { const int BatchSize = 20000; // 修正批次大小 const int EmptyWaitMilliseconds = 1000; var timeout = TimeSpan.FromSeconds(5); var metadata = await _dbProvider.GetMetadata(); try { while (!cancellationToken.IsCancellationRequested) { var batchStopwatch = Stopwatch.StartNew(); var batch = new List>>(); try { // 异步批量读取数据 while (batch.Count < BatchSize && batchStopwatch.Elapsed < timeout) { while (telemetryPacketInfoReader.TryRead(out var data)) { batch.Add(data); if (batch.Count >= BatchSize) break; } if (batch.Count >= BatchSize) break; // 无更多数据时等待 if (!await telemetryPacketInfoReader.WaitToReadAsync(cancellationToken)) break; } } catch (OperationCanceledException) { break; } if (batch.Count == 0) { await Task.Delay(EmptyWaitMilliseconds, cancellationToken); continue; } // 按TopicName分组处理 var topicGroups = new Dictionary>(); foreach (var (topicName, records) in batch) { if (!topicGroups.TryGetValue(topicName, out var list)) { list = new List(); topicGroups[topicName] = list; } list.AddRange(records); } // 处理每个分组 foreach (var (topicName, records) in topicGroups) { try { // 批量写入数据库 await _dbProvider.BatchInsertAsync(metadata, records); // 限流推送Kafka await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: records, deviceIdSelector: data => data.DeviceId, processor: async (data, groupIndex) => await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) ); } catch (Exception ex) { _logger.LogError(ex, "处理主题 {TopicName} 数据时发生异常", topicName); } } _logger.LogInformation("处理完成批次: {Count} 条, 耗时: {Elapsed}ms", batch.Count, batchStopwatch.ElapsedMilliseconds); } } catch (Exception ex) { _logger.LogCritical(ex, "定时任务处理发生致命错误"); throw; } } /// /// Kafka推送消息(增加重试机制和参数校验) /// protected async Task KafkaProducerIssuedMessageAction( string topicName, T taskRecord, int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } const int maxRetries = 3;//重试次数 for (int retry = 0; retry < maxRetries; retry++) { try { await _producerService.ProduceAsync(topicName, taskRecord, partition); return; } catch (Exception ex) { _logger.LogWarning(ex, "Kafka推送{topicName}主题分区{partition}重试中({Retry}/{MaxRetries})", topicName, partition, retry + 1, maxRetries); if (retry == maxRetries - 1) throw; await Task.Delay(1000 * (retry + 1)); } } } } }