diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index 18df86f..37ba6f7 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -69,7 +69,7 @@ namespace JiShe.CollectBus.Kafka.Producer { BootstrapServers = _kafkaOptionConfig.BootstrapServers, //AllowAutoCreateTopics = true, - QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB + QueueBufferingMaxKbytes = 4194304, // 修改缓冲区最大为2GB,默认为1GB CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd BatchSize = 32_768, // 修改批次大小为32K LingerMs = 10, // 修改等待时间为20ms,默认为5ms diff --git a/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs b/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs index e247796..2b1d8b7 100644 --- a/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -20,13 +21,14 @@ namespace JiShe.CollectBus.DataChannels /// 定时任务数据通道写入 /// /// - Task ScheduledMeterTaskWriter(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems); + Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems); /// /// 定时任务数据入库和Kafka推送通道 /// /// - Task ScheduledMeterTaskReadding(ChannelReader>> _telemetryPacketInfoReader); + Task ScheduledMeterTaskReadingAsync(ChannelReader>> _telemetryPacketInfoReader, + CancellationToken cancellationToken); #endregion } } diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index 757af18..c129194 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -1,30 +1,19 @@ -using DnsClient.Protocol; -using JiShe.CollectBus.Application.Contracts; -using JiShe.CollectBus.Common; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; -using JiShe.CollectBus.Protocol.Interfaces; -using JiShe.CollectBus.Protocol.Services; -using JiShe.CollectBus.RedisDataCache; -using JiShe.CollectBus.ScheduledMeterReading; -using Microsoft.AspNetCore.Mvc; -using Microsoft.AspNetCore.Mvc.RazorPages; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Diagnostics; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; -using Volo.Abp.Timing; namespace JiShe.CollectBus.DataChannels { @@ -61,108 +50,136 @@ namespace JiShe.CollectBus.DataChannels /// 定时任务数据通道写入 /// /// - public async Task ScheduledMeterTaskWriter(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems) + public async Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems) { await _telemetryPacketInfoWriter.WriteAsync(dataItems); } + + /// /// 定时任务数据入库和Kafka推送通道 /// - /// - public async Task ScheduledMeterTaskReadding(ChannelReader>> _telemetryPacketInfoReader) + public async Task ScheduledMeterTaskReadingAsync( + ChannelReader>> telemetryPacketInfoReader, + CancellationToken cancellationToken = default) { + const int BatchSize = 20000; // 修正批次大小 + const int EmptyWaitMilliseconds = 1000; + var timeout = TimeSpan.FromSeconds(5); + var metadata = await _dbProvider.GetMetadata(); + try { - var metadata = await _dbProvider.GetMetadata(); - var batchSize = 200_00; - var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒 - var timer = Stopwatch.StartNew(); - long timeoutMilliseconds = 0; - - List taskInfoList = new List(); - while (true) + while (!cancellationToken.IsCancellationRequested) { - var canRead = _telemetryPacketInfoReader.Count; - if (canRead <= 0) + var batchStopwatch = Stopwatch.StartNew(); + var batch = new List>>(); + + try { - if (timeoutMilliseconds > 0) + // 异步批量读取数据 + while (batch.Count < BatchSize && batchStopwatch.Elapsed < timeout) { - _logger.LogError($"{nameof(ScheduledMeterTaskReadding)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); + while (telemetryPacketInfoReader.TryRead(out var data)) + { + batch.Add(data); + if (batch.Count >= BatchSize) break; + } + + if (batch.Count >= BatchSize) break; + + // 无更多数据时等待 + if (!await telemetryPacketInfoReader.WaitToReadAsync(cancellationToken)) + break; } - timeoutMilliseconds = 0; - //无消息时短等待1秒 - await Task.Delay(100_0); + } + catch (OperationCanceledException) + { + break; + } + + if (batch.Count == 0) + { + await Task.Delay(EmptyWaitMilliseconds, cancellationToken); continue; } - timer.Restart(); - var startTime = DateTime.Now; - string topicName = string.Empty; + // 按TopicName分组处理 + var topicGroups = new Dictionary>(); + foreach (var (topicName, records) in batch) + { + if (!topicGroups.TryGetValue(topicName, out var list)) + { + list = new List(); + topicGroups[topicName] = list; + } + list.AddRange(records); + } - while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout) + // 处理每个分组 + foreach (var (topicName, records) in topicGroups) { try - { - if (_telemetryPacketInfoReader.TryRead(out var dataItem)) - { - topicName = dataItem.Item1; - taskInfoList.AddRange(dataItem.Item2); - } - } - catch (Exception ee) { + // 批量写入数据库 + await _dbProvider.BatchInsertAsync(metadata, records); - throw; + // 限流推送Kafka + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: records, + deviceIdSelector: data => data.DeviceId, + processor: async (data, groupIndex) => + await KafkaProducerIssuedMessageAction(topicName, data, groupIndex) + ); + } + catch (Exception ex) + { + _logger.LogError(ex, "处理主题 {TopicName} 数据时发生异常", topicName); } } - if (taskInfoList != null && taskInfoList.Count > 0) - { - await _dbProvider.BatchInsertAsync(metadata, taskInfoList); - - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: taskInfoList.ToList(), - deviceIdSelector: data => data.DeviceId, - processor: (data, groupIndex) => - { - //_ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex); - } - ); - - taskInfoList.Clear(); - } - timer.Stop(); - - timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; - - startTime = DateTime.Now; + _logger.LogInformation("处理完成批次: {Count} 条, 耗时: {Elapsed}ms", + batch.Count, batchStopwatch.ElapsedMilliseconds); } } catch (Exception ex) { - + _logger.LogCritical(ex, "定时任务处理发生致命错误"); throw; } } /// - /// Kafka 推送消息 + /// Kafka推送消息(增加重试机制和参数校验) /// - /// 主题名称 - /// 任务记录 - /// 对应分区,也就是集中器号所在的分组序号 - /// - protected async Task KafkaProducerIssuedMessageAction(string topicName, - T taskRecord, int partition) where T : class + protected async Task KafkaProducerIssuedMessageAction( + string topicName, + T taskRecord, + int partition) where T : class { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } - await _producerService.ProduceAsync(topicName, taskRecord, partition); + const int maxRetries = 3;//重试次数 + for (int retry = 0; retry < maxRetries; retry++) + { + try + { + await _producerService.ProduceAsync(topicName, taskRecord, partition); + return; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Kafka推送{topicName}主题重试中({Retry}/{MaxRetries})", topicName, retry + 1, maxRetries); + if (retry == maxRetries - 1) throw; + await Task.Delay(1000 * (retry + 1)); + } + } } + } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index e707f8a..2104d28 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -33,6 +33,7 @@ using JiShe.CollectBus.DataChannels; using JiShe.CollectBus.DataMigration.Options; using static System.Runtime.InteropServices.JavaScript.JSType; using static System.Formats.Asn1.AsnWriter; +using System.Threading; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -159,7 +160,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取 @@ -176,7 +177,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取 @@ -193,7 +194,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结 @@ -210,7 +211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结 @@ -227,7 +228,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask)); }); } else @@ -261,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask)); }); } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) @@ -280,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask)); }); } else @@ -306,7 +307,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); } #region 电表采集处理 @@ -328,7 +329,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - _ = _dataChannelManage.ScheduledMeterTaskReadding(DataChannelManage.TaskDataChannel.Reader); + // 创建取消令牌源 + var cts = new CancellationTokenSource(); + + _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader, cts.Token); //此处代码不要删除 #if DEBUG