using DnsClient.Protocol; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Services; using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.ScheduledMeterReading; using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc.RazorPages; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; using Volo.Abp.Timing; namespace JiShe.CollectBus.DataChannels { /// /// 数据通道管理服务 /// public class DataChannelManageService : IDataChannelManageService, ITransientDependency { private readonly ILogger _logger; private readonly IIoTDbProvider _dbProvider; private readonly IProducerService _producerService; private readonly KafkaOptionConfig _kafkaOptions; private readonly ServerApplicationOptions _applicationOptions; private readonly IoTDBRuntimeContext _runtimeContext; public DataChannelManageService( ILogger logger, IIoTDbProvider dbProvider, IoTDBRuntimeContext runtimeContext, IProducerService producerService, IOptions kafkaOptions, IOptions applicationOptions) { _logger = logger; _dbProvider = dbProvider; _runtimeContext = runtimeContext; _producerService = producerService; _kafkaOptions = kafkaOptions.Value; _applicationOptions = applicationOptions.Value; _runtimeContext.UseTableSessionPool = true; } /// /// 定时任务数据通道写入 /// /// public async Task ScheduledMeterTaskWriter(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems) { await _telemetryPacketInfoWriter.WriteAsync(dataItems); } /// /// 定时任务数据入库和Kafka推送通道 /// /// public async Task ScheduledMeterTaskReadding(ChannelReader>> _telemetryPacketInfoReader) { var metadata = await _dbProvider.GetMetadata(); var batchSize = 10000; var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒 List taskInfoList = new List(); var startTime = DateTime.Now; var timer = new Stopwatch(); while (true) { var canRead = await _telemetryPacketInfoReader.WaitToReadAsync(); if (!canRead) { continue; } while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout) { if (_telemetryPacketInfoReader.TryRead(out var dataItem)) { taskInfoList.AddRange(dataItem.Item2); } else { //无消息时短暂等待 await Task.Delay(5); } } if (taskInfoList != null && taskInfoList.Count > 0) { await _dbProvider.BatchInsertAsync(metadata, taskInfoList); await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: taskInfoList.ToList(), deviceIdSelector: data => data.DeviceId, processor: (data, groupIndex) => { // _ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex); } ); taskInfoList.Clear(); } startTime = DateTime.Now; } } /// /// Kafka 推送消息 /// /// 主题名称 /// 任务记录 /// 对应分区,也就是集中器号所在的分组序号 /// protected async Task KafkaProducerIssuedMessageAction(string topicName, T taskRecord, int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } await _producerService.ProduceAsync(topicName, taskRecord, partition); } } }