From 01264dd3ec93d721adb5035708dceb23569059b9 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Tue, 6 May 2025 14:33:49 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE=E9=80=9A?=
=?UTF-8?q?=E9=81=93=E8=AF=BB=E5=8F=96=E9=80=9F=E5=BA=A6=EF=BC=8C=E5=B9=B6?=
=?UTF-8?q?=E6=8E=A8=E9=80=81Kafka?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Producer/ProducerService.cs | 2 +-
.../DataChannels/IDataChannelManageService.cs | 6 +-
.../DataChannels/DataChannelManageService.cs | 161 ++++++++++--------
.../BasicScheduledMeterReadingService.cs | 22 ++-
4 files changed, 107 insertions(+), 84 deletions(-)
diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
index 18df86f..37ba6f7 100644
--- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
@@ -69,7 +69,7 @@ namespace JiShe.CollectBus.Kafka.Producer
{
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
//AllowAutoCreateTopics = true,
- QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB
+ QueueBufferingMaxKbytes = 4194304, // 修改缓冲区最大为2GB,默认为1GB
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd
BatchSize = 32_768, // 修改批次大小为32K
LingerMs = 10, // 修改等待时间为20ms,默认为5ms
diff --git a/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs b/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs
index e247796..2b1d8b7 100644
--- a/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs
+++ b/services/JiShe.CollectBus.Application.Contracts/DataChannels/IDataChannelManageService.cs
@@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
+using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
@@ -20,13 +21,14 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据通道写入
///
///
- Task ScheduledMeterTaskWriter(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems);
+ Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems);
///
/// 定时任务数据入库和Kafka推送通道
///
///
- Task ScheduledMeterTaskReadding(ChannelReader>> _telemetryPacketInfoReader);
+ Task ScheduledMeterTaskReadingAsync(ChannelReader>> _telemetryPacketInfoReader,
+ CancellationToken cancellationToken);
#endregion
}
}
diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs
index 757af18..c129194 100644
--- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs
+++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs
@@ -1,30 +1,19 @@
-using DnsClient.Protocol;
-using JiShe.CollectBus.Application.Contracts;
-using JiShe.CollectBus.Common;
+using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
-using JiShe.CollectBus.Protocol.Interfaces;
-using JiShe.CollectBus.Protocol.Services;
-using JiShe.CollectBus.RedisDataCache;
-using JiShe.CollectBus.ScheduledMeterReading;
-using Microsoft.AspNetCore.Mvc;
-using Microsoft.AspNetCore.Mvc.RazorPages;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
-using System.Linq;
-using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
-using Volo.Abp.Timing;
namespace JiShe.CollectBus.DataChannels
{
@@ -61,108 +50,136 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据通道写入
///
///
- public async Task ScheduledMeterTaskWriter(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems)
+ public async Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems)
{
await _telemetryPacketInfoWriter.WriteAsync(dataItems);
}
+
+
///
/// 定时任务数据入库和Kafka推送通道
///
- ///
- public async Task ScheduledMeterTaskReadding(ChannelReader>> _telemetryPacketInfoReader)
+ public async Task ScheduledMeterTaskReadingAsync(
+ ChannelReader>> telemetryPacketInfoReader,
+ CancellationToken cancellationToken = default)
{
+ const int BatchSize = 20000; // 修正批次大小
+ const int EmptyWaitMilliseconds = 1000;
+ var timeout = TimeSpan.FromSeconds(5);
+ var metadata = await _dbProvider.GetMetadata();
+
try
{
- var metadata = await _dbProvider.GetMetadata();
- var batchSize = 200_00;
- var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒
- var timer = Stopwatch.StartNew();
- long timeoutMilliseconds = 0;
-
- List taskInfoList = new List();
- while (true)
+ while (!cancellationToken.IsCancellationRequested)
{
- var canRead = _telemetryPacketInfoReader.Count;
- if (canRead <= 0)
+ var batchStopwatch = Stopwatch.StartNew();
+ var batch = new List>>();
+
+ try
{
- if (timeoutMilliseconds > 0)
+ // 异步批量读取数据
+ while (batch.Count < BatchSize && batchStopwatch.Elapsed < timeout)
{
- _logger.LogError($"{nameof(ScheduledMeterTaskReadding)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
+ while (telemetryPacketInfoReader.TryRead(out var data))
+ {
+ batch.Add(data);
+ if (batch.Count >= BatchSize) break;
+ }
+
+ if (batch.Count >= BatchSize) break;
+
+ // 无更多数据时等待
+ if (!await telemetryPacketInfoReader.WaitToReadAsync(cancellationToken))
+ break;
}
- timeoutMilliseconds = 0;
- //无消息时短等待1秒
- await Task.Delay(100_0);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+
+ if (batch.Count == 0)
+ {
+ await Task.Delay(EmptyWaitMilliseconds, cancellationToken);
continue;
}
- timer.Restart();
- var startTime = DateTime.Now;
- string topicName = string.Empty;
+ // 按TopicName分组处理
+ var topicGroups = new Dictionary>();
+ foreach (var (topicName, records) in batch)
+ {
+ if (!topicGroups.TryGetValue(topicName, out var list))
+ {
+ list = new List();
+ topicGroups[topicName] = list;
+ }
+ list.AddRange(records);
+ }
- while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout)
+ // 处理每个分组
+ foreach (var (topicName, records) in topicGroups)
{
try
- {
- if (_telemetryPacketInfoReader.TryRead(out var dataItem))
- {
- topicName = dataItem.Item1;
- taskInfoList.AddRange(dataItem.Item2);
- }
- }
- catch (Exception ee)
{
+ // 批量写入数据库
+ await _dbProvider.BatchInsertAsync(metadata, records);
- throw;
+ // 限流推送Kafka
+ await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
+ items: records,
+ deviceIdSelector: data => data.DeviceId,
+ processor: async (data, groupIndex) =>
+ await KafkaProducerIssuedMessageAction(topicName, data, groupIndex)
+ );
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "处理主题 {TopicName} 数据时发生异常", topicName);
}
}
- if (taskInfoList != null && taskInfoList.Count > 0)
- {
- await _dbProvider.BatchInsertAsync(metadata, taskInfoList);
-
- await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
- items: taskInfoList.ToList(),
- deviceIdSelector: data => data.DeviceId,
- processor: (data, groupIndex) =>
- {
- //_ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex);
- }
- );
-
- taskInfoList.Clear();
- }
- timer.Stop();
-
- timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
-
- startTime = DateTime.Now;
+ _logger.LogInformation("处理完成批次: {Count} 条, 耗时: {Elapsed}ms",
+ batch.Count, batchStopwatch.ElapsedMilliseconds);
}
}
catch (Exception ex)
{
-
+ _logger.LogCritical(ex, "定时任务处理发生致命错误");
throw;
}
}
///
- /// Kafka 推送消息
+ /// Kafka推送消息(增加重试机制和参数校验)
///
- /// 主题名称
- /// 任务记录
- /// 对应分区,也就是集中器号所在的分组序号
- ///
- protected async Task KafkaProducerIssuedMessageAction(string topicName,
- T taskRecord, int partition) where T : class
+ protected async Task KafkaProducerIssuedMessageAction(
+ string topicName,
+ T taskRecord,
+ int partition) where T : class
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
}
- await _producerService.ProduceAsync(topicName, taskRecord, partition);
+ const int maxRetries = 3;//重试次数
+ for (int retry = 0; retry < maxRetries; retry++)
+ {
+ try
+ {
+ await _producerService.ProduceAsync(topicName, taskRecord, partition);
+ return;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Kafka推送{topicName}主题重试中({Retry}/{MaxRetries})", topicName, retry + 1, maxRetries);
+ if (retry == maxRetries - 1) throw;
+ await Task.Delay(1000 * (retry + 1));
+ }
+ }
}
+
}
}
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index e707f8a..2104d28 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -33,6 +33,7 @@ using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
using static System.Runtime.InteropServices.JavaScript.JSType;
using static System.Formats.Asn1.AsnWriter;
+using System.Threading;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@@ -159,7 +160,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
- _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
+ _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
@@ -176,7 +177,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
- _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
+ _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
@@ -193,7 +194,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
- _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
+ _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
@@ -210,7 +211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
- _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
+ _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
@@ -227,7 +228,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
- _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
+ _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else
@@ -261,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
- _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask));
+ _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask));
});
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
@@ -280,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
- _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
+ _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
});
}
else
@@ -306,7 +307,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务");
return;
}
- _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
+ _ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
}
#region 电表采集处理
@@ -328,7 +329,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
- _ = _dataChannelManage.ScheduledMeterTaskReadding(DataChannelManage.TaskDataChannel.Reader);
+ // 创建取消令牌源
+ var cts = new CancellationTokenSource();
+
+ _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader, cts.Token);
//此处代码不要删除
#if DEBUG