using DnsClient.Protocol; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Services; using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.ScheduledMeterReading; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc.RazorPages; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; using Volo.Abp.Timing; namespace JiShe.CollectBus.DataChannels { /// /// 数据通道管理服务 /// public class DataChannelManageService : IDataChannelManageService, ITransientDependency { private readonly ILogger _logger; private readonly IIoTDbProvider _dbProvider; private readonly IProducerService _producerService; private readonly KafkaOptionConfig _kafkaOptions; private readonly ServerApplicationOptions _applicationOptions; private readonly IoTDBRuntimeContext _runtimeContext; public DataChannelManageService( ILogger logger, IIoTDbProvider dbProvider, IoTDBRuntimeContext runtimeContext, IProducerService producerService, IOptions kafkaOptions, IOptions applicationOptions) { _logger = logger; _dbProvider = dbProvider; _runtimeContext = runtimeContext; _producerService = producerService; _kafkaOptions = kafkaOptions.Value; _applicationOptions = applicationOptions.Value; _runtimeContext.UseTableSessionPool = true; } /// /// 定时任务数据通道写入 /// /// public async Task ScheduledMeterTaskWriter(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems) { await _telemetryPacketInfoWriter.WriteAsync(dataItems); } /// /// 定时任务数据入库和Kafka推送通道 /// /// public async Task ScheduledMeterTaskReadding(ChannelReader>> _telemetryPacketInfoReader) { try { var metadata = await _dbProvider.GetMetadata(); var batchSize = 200_00; var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒 var timer = Stopwatch.StartNew(); long timeoutMilliseconds = 0; List taskInfoList = new List(); while (true) { var canRead = _telemetryPacketInfoReader.Count; if (canRead <= 0) { if (timeoutMilliseconds > 0) { _logger.LogError($"{nameof(ScheduledMeterTaskReadding)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); } timeoutMilliseconds = 0; //无消息时短等待1秒 await Task.Delay(100_0); continue; } timer.Restart(); var startTime = DateTime.Now; string topicName = string.Empty; while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout) { try { if (_telemetryPacketInfoReader.TryRead(out var dataItem)) { topicName = dataItem.Item1; taskInfoList.AddRange(dataItem.Item2); } } catch (Exception ee) { throw; } } if (taskInfoList != null && taskInfoList.Count > 0) { await _dbProvider.BatchInsertAsync(metadata, taskInfoList); await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: taskInfoList.ToList(), deviceIdSelector: data => data.DeviceId, processor: (data, groupIndex) => { //_ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex); } ); taskInfoList.Clear(); } timer.Stop(); timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; startTime = DateTime.Now; } } catch (Exception ex) { throw; } } /// /// Kafka 推送消息 /// /// 主题名称 /// 任务记录 /// 对应分区,也就是集中器号所在的分组序号 /// protected async Task KafkaProducerIssuedMessageAction(string topicName, T taskRecord, int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } await _producerService.ProduceAsync(topicName, taskRecord, partition); } } }