优化数据通道读取速度,并推送Kafka

This commit is contained in:
ChenYi 2025-05-06 14:33:49 +08:00
parent 53fa6f503c
commit 01264dd3ec
4 changed files with 107 additions and 84 deletions

View File

@ -69,7 +69,7 @@ namespace JiShe.CollectBus.Kafka.Producer
{
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
//AllowAutoCreateTopics = true,
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB
QueueBufferingMaxKbytes = 4194304, // 修改缓冲区最大为2GB默认为1GB
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
BatchSize = 32_768, // 修改批次大小为32K
LingerMs = 10, // 修改等待时间为20ms默认为5ms

View File

@ -4,6 +4,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
@ -20,13 +21,14 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据通道写入
/// </summary>
/// <returns></returns>
Task ScheduledMeterTaskWriter(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems);
Task ScheduledMeterTaskWriterAsync(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems);
/// <summary>
/// 定时任务数据入库和Kafka推送通道
/// </summary>
/// <returns></returns>
Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader);
Task ScheduledMeterTaskReadingAsync(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader,
CancellationToken cancellationToken);
#endregion
}
}

View File

@ -1,30 +1,19 @@
using DnsClient.Protocol;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Services;
using JiShe.CollectBus.RedisDataCache;
using JiShe.CollectBus.ScheduledMeterReading;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.RazorPages;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Timing;
namespace JiShe.CollectBus.DataChannels
{
@ -61,108 +50,136 @@ namespace JiShe.CollectBus.DataChannels
/// 定时任务数据通道写入
/// </summary>
/// <returns></returns>
public async Task ScheduledMeterTaskWriter(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems)
public async Task ScheduledMeterTaskWriterAsync(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems)
{
await _telemetryPacketInfoWriter.WriteAsync(dataItems);
}
/// <summary>
/// 定时任务数据入库和Kafka推送通道
/// </summary>
/// <returns></returns>
public async Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader)
public async Task ScheduledMeterTaskReadingAsync(
ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> telemetryPacketInfoReader,
CancellationToken cancellationToken = default)
{
const int BatchSize = 20000; // 修正批次大小
const int EmptyWaitMilliseconds = 1000;
var timeout = TimeSpan.FromSeconds(5);
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
try
{
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
var batchSize = 200_00;
var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒
var timer = Stopwatch.StartNew();
long timeoutMilliseconds = 0;
List<MeterReadingTelemetryPacketInfo> taskInfoList = new List<MeterReadingTelemetryPacketInfo>();
while (true)
while (!cancellationToken.IsCancellationRequested)
{
var canRead = _telemetryPacketInfoReader.Count;
if (canRead <= 0)
var batchStopwatch = Stopwatch.StartNew();
var batch = new List<Tuple<string, List<MeterReadingTelemetryPacketInfo>>>();
try
{
if (timeoutMilliseconds > 0)
// 异步批量读取数据
while (batch.Count < BatchSize && batchStopwatch.Elapsed < timeout)
{
_logger.LogError($"{nameof(ScheduledMeterTaskReadding)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
while (telemetryPacketInfoReader.TryRead(out var data))
{
batch.Add(data);
if (batch.Count >= BatchSize) break;
}
if (batch.Count >= BatchSize) break;
// 无更多数据时等待
if (!await telemetryPacketInfoReader.WaitToReadAsync(cancellationToken))
break;
}
timeoutMilliseconds = 0;
//无消息时短等待1秒
await Task.Delay(100_0);
}
catch (OperationCanceledException)
{
break;
}
if (batch.Count == 0)
{
await Task.Delay(EmptyWaitMilliseconds, cancellationToken);
continue;
}
timer.Restart();
var startTime = DateTime.Now;
string topicName = string.Empty;
// 按TopicName分组处理
var topicGroups = new Dictionary<string, List<MeterReadingTelemetryPacketInfo>>();
foreach (var (topicName, records) in batch)
{
if (!topicGroups.TryGetValue(topicName, out var list))
{
list = new List<MeterReadingTelemetryPacketInfo>();
topicGroups[topicName] = list;
}
list.AddRange(records);
}
while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout)
// 处理每个分组
foreach (var (topicName, records) in topicGroups)
{
try
{
if (_telemetryPacketInfoReader.TryRead(out var dataItem))
{
topicName = dataItem.Item1;
taskInfoList.AddRange(dataItem.Item2);
}
// 批量写入数据库
await _dbProvider.BatchInsertAsync(metadata, records);
// 限流推送Kafka
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: records,
deviceIdSelector: data => data.DeviceId,
processor: async (data, groupIndex) =>
await KafkaProducerIssuedMessageAction(topicName, data, groupIndex)
);
}
catch (Exception ee)
catch (Exception ex)
{
throw;
_logger.LogError(ex, "处理主题 {TopicName} 数据时发生异常", topicName);
}
}
if (taskInfoList != null && taskInfoList.Count > 0)
{
await _dbProvider.BatchInsertAsync(metadata, taskInfoList);
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskInfoList.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
//_ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex);
}
);
taskInfoList.Clear();
}
timer.Stop();
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
startTime = DateTime.Now;
_logger.LogInformation("处理完成批次: {Count} 条, 耗时: {Elapsed}ms",
batch.Count, batchStopwatch.ElapsedMilliseconds);
}
}
catch (Exception ex)
{
_logger.LogCritical(ex, "定时任务处理发生致命错误");
throw;
}
}
/// <summary>
/// Kafka 推送消息
/// Kafka推送消息(增加重试机制和参数校验)
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
T taskRecord, int partition) where T : class
protected async Task KafkaProducerIssuedMessageAction<T>(
string topicName,
T taskRecord,
int partition) where T : class
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
}
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
const int maxRetries = 3;//重试次数
for (int retry = 0; retry < maxRetries; retry++)
{
try
{
await _producerService.ProduceAsync(topicName, taskRecord, partition);
return;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Kafka推送{topicName}主题重试中({Retry}/{MaxRetries})", topicName, retry + 1, maxRetries);
if (retry == maxRetries - 1) throw;
await Task.Delay(1000 * (retry + 1));
}
}
}
}
}

View File

@ -33,6 +33,7 @@ using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
using static System.Runtime.InteropServices.JavaScript.JSType;
using static System.Formats.Asn1.AsnWriter;
using System.Threading;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@ -159,7 +160,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
@ -176,7 +177,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
@ -193,7 +194,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
@ -210,7 +211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
@ -227,7 +228,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else
@ -261,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask));
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask));
});
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
@ -280,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
});
}
else
@ -306,7 +307,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
}
#region
@ -328,7 +329,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
_ = _dataChannelManage.ScheduledMeterTaskReadding(DataChannelManage.TaskDataChannel.Reader);
// 创建取消令牌源
var cts = new CancellationTokenSource();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader, cts.Token);
//此处代码不要删除
#if DEBUG