Merge branch 'dev' of https://310.jisheyun.com/daizan/JiShe.CollectBus into dev
This commit is contained in:
commit
09de209a7c
@ -69,7 +69,7 @@ namespace JiShe.CollectBus.Kafka.Producer
|
||||
{
|
||||
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
|
||||
//AllowAutoCreateTopics = true,
|
||||
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB
|
||||
QueueBufferingMaxKbytes = 4194304, // 修改缓冲区最大为2GB,默认为1GB
|
||||
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd
|
||||
BatchSize = 32_768, // 修改批次大小为32K
|
||||
LingerMs = 10, // 修改等待时间为20ms,默认为5ms
|
||||
|
||||
@ -4,6 +4,7 @@ using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
@ -20,13 +21,14 @@ namespace JiShe.CollectBus.DataChannels
|
||||
/// 定时任务数据通道写入
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task ScheduledMeterTaskWriter(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems);
|
||||
Task ScheduledMeterTaskWriterAsync(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems);
|
||||
|
||||
/// <summary>
|
||||
/// 定时任务数据入库和Kafka推送通道
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader);
|
||||
Task ScheduledMeterTaskReadingAsync(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader,
|
||||
CancellationToken cancellationToken);
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,30 +1,19 @@
|
||||
using DnsClient.Protocol;
|
||||
using JiShe.CollectBus.Application.Contracts;
|
||||
using JiShe.CollectBus.Common;
|
||||
using JiShe.CollectBus.Common;
|
||||
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
||||
using JiShe.CollectBus.IoTDB.Context;
|
||||
using JiShe.CollectBus.IoTDB.Interface;
|
||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||
using JiShe.CollectBus.Kafka.Internal;
|
||||
using JiShe.CollectBus.Kafka.Producer;
|
||||
using JiShe.CollectBus.Protocol.Interfaces;
|
||||
using JiShe.CollectBus.Protocol.Services;
|
||||
using JiShe.CollectBus.RedisDataCache;
|
||||
using JiShe.CollectBus.ScheduledMeterReading;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.AspNetCore.Mvc.RazorPages;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
using Volo.Abp.DependencyInjection;
|
||||
using Volo.Abp.Timing;
|
||||
|
||||
namespace JiShe.CollectBus.DataChannels
|
||||
{
|
||||
@ -61,108 +50,136 @@ namespace JiShe.CollectBus.DataChannels
|
||||
/// 定时任务数据通道写入
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task ScheduledMeterTaskWriter(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems)
|
||||
public async Task ScheduledMeterTaskWriterAsync(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems)
|
||||
{
|
||||
await _telemetryPacketInfoWriter.WriteAsync(dataItems);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 定时任务数据入库和Kafka推送通道
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader)
|
||||
public async Task ScheduledMeterTaskReadingAsync(
|
||||
ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> telemetryPacketInfoReader,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
const int BatchSize = 20000; // 修正批次大小
|
||||
const int EmptyWaitMilliseconds = 1000;
|
||||
var timeout = TimeSpan.FromSeconds(5);
|
||||
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
||||
|
||||
try
|
||||
{
|
||||
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
||||
var batchSize = 200_00;
|
||||
var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒
|
||||
var timer = Stopwatch.StartNew();
|
||||
long timeoutMilliseconds = 0;
|
||||
|
||||
List<MeterReadingTelemetryPacketInfo> taskInfoList = new List<MeterReadingTelemetryPacketInfo>();
|
||||
while (true)
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
var canRead = _telemetryPacketInfoReader.Count;
|
||||
if (canRead <= 0)
|
||||
var batchStopwatch = Stopwatch.StartNew();
|
||||
var batch = new List<Tuple<string, List<MeterReadingTelemetryPacketInfo>>>();
|
||||
|
||||
try
|
||||
{
|
||||
if (timeoutMilliseconds > 0)
|
||||
// 异步批量读取数据
|
||||
while (batch.Count < BatchSize && batchStopwatch.Elapsed < timeout)
|
||||
{
|
||||
_logger.LogError($"{nameof(ScheduledMeterTaskReadding)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
|
||||
while (telemetryPacketInfoReader.TryRead(out var data))
|
||||
{
|
||||
batch.Add(data);
|
||||
if (batch.Count >= BatchSize) break;
|
||||
}
|
||||
|
||||
if (batch.Count >= BatchSize) break;
|
||||
|
||||
// 无更多数据时等待
|
||||
if (!await telemetryPacketInfoReader.WaitToReadAsync(cancellationToken))
|
||||
break;
|
||||
}
|
||||
timeoutMilliseconds = 0;
|
||||
//无消息时短等待1秒
|
||||
await Task.Delay(100_0);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
if (batch.Count == 0)
|
||||
{
|
||||
await Task.Delay(EmptyWaitMilliseconds, cancellationToken);
|
||||
continue;
|
||||
}
|
||||
timer.Restart();
|
||||
|
||||
var startTime = DateTime.Now;
|
||||
string topicName = string.Empty;
|
||||
// 按TopicName分组处理
|
||||
var topicGroups = new Dictionary<string, List<MeterReadingTelemetryPacketInfo>>();
|
||||
foreach (var (topicName, records) in batch)
|
||||
{
|
||||
if (!topicGroups.TryGetValue(topicName, out var list))
|
||||
{
|
||||
list = new List<MeterReadingTelemetryPacketInfo>();
|
||||
topicGroups[topicName] = list;
|
||||
}
|
||||
list.AddRange(records);
|
||||
}
|
||||
|
||||
while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout)
|
||||
// 处理每个分组
|
||||
foreach (var (topicName, records) in topicGroups)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_telemetryPacketInfoReader.TryRead(out var dataItem))
|
||||
{
|
||||
topicName = dataItem.Item1;
|
||||
taskInfoList.AddRange(dataItem.Item2);
|
||||
}
|
||||
// 批量写入数据库
|
||||
await _dbProvider.BatchInsertAsync(metadata, records);
|
||||
|
||||
// 限流推送Kafka
|
||||
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
||||
items: records,
|
||||
deviceIdSelector: data => data.DeviceId,
|
||||
processor: async (data, groupIndex) =>
|
||||
await KafkaProducerIssuedMessageAction(topicName, data, groupIndex)
|
||||
);
|
||||
}
|
||||
catch (Exception ee)
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
throw;
|
||||
_logger.LogError(ex, "处理主题 {TopicName} 数据时发生异常", topicName);
|
||||
}
|
||||
}
|
||||
|
||||
if (taskInfoList != null && taskInfoList.Count > 0)
|
||||
{
|
||||
await _dbProvider.BatchInsertAsync(metadata, taskInfoList);
|
||||
|
||||
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
|
||||
items: taskInfoList.ToList(),
|
||||
deviceIdSelector: data => data.DeviceId,
|
||||
processor: (data, groupIndex) =>
|
||||
{
|
||||
//_ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex);
|
||||
}
|
||||
);
|
||||
|
||||
taskInfoList.Clear();
|
||||
}
|
||||
timer.Stop();
|
||||
|
||||
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
|
||||
|
||||
startTime = DateTime.Now;
|
||||
_logger.LogInformation("处理完成批次: {Count} 条, 耗时: {Elapsed}ms",
|
||||
batch.Count, batchStopwatch.ElapsedMilliseconds);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
_logger.LogCritical(ex, "定时任务处理发生致命错误");
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Kafka 推送消息
|
||||
/// Kafka推送消息(增加重试机制和参数校验)
|
||||
/// </summary>
|
||||
/// <param name="topicName">主题名称</param>
|
||||
/// <param name="taskRecord">任务记录</param>
|
||||
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
|
||||
/// <returns></returns>
|
||||
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
|
||||
T taskRecord, int partition) where T : class
|
||||
protected async Task KafkaProducerIssuedMessageAction<T>(
|
||||
string topicName,
|
||||
T taskRecord,
|
||||
int partition) where T : class
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
||||
{
|
||||
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
|
||||
}
|
||||
|
||||
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
|
||||
const int maxRetries = 3;//重试次数
|
||||
for (int retry = 0; retry < maxRetries; retry++)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _producerService.ProduceAsync(topicName, taskRecord, partition);
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Kafka推送{topicName}主题重试中({Retry}/{MaxRetries})", topicName, retry + 1, maxRetries);
|
||||
if (retry == maxRetries - 1) throw;
|
||||
await Task.Delay(1000 * (retry + 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -33,6 +33,7 @@ using JiShe.CollectBus.DataChannels;
|
||||
using JiShe.CollectBus.DataMigration.Options;
|
||||
using static System.Runtime.InteropServices.JavaScript.JSType;
|
||||
using static System.Formats.Asn1.AsnWriter;
|
||||
using System.Threading;
|
||||
|
||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
{
|
||||
@ -159,7 +160,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
||||
return;
|
||||
}
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
});
|
||||
}
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
|
||||
@ -176,7 +177,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
||||
return;
|
||||
}
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
});
|
||||
}
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
|
||||
@ -193,7 +194,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
||||
return;
|
||||
}
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
});
|
||||
}
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
|
||||
@ -210,7 +211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
||||
return;
|
||||
}
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
});
|
||||
}
|
||||
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
|
||||
@ -227,7 +228,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
||||
return;
|
||||
}
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
|
||||
});
|
||||
}
|
||||
else
|
||||
@ -261,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
||||
return;
|
||||
}
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask));
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask));
|
||||
});
|
||||
}
|
||||
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
|
||||
@ -280,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
||||
return;
|
||||
}
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
|
||||
});
|
||||
}
|
||||
else
|
||||
@ -306,7 +307,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
_logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务");
|
||||
return;
|
||||
}
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
|
||||
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
|
||||
}
|
||||
|
||||
#region 电表采集处理
|
||||
@ -328,7 +329,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
||||
/// <returns></returns>
|
||||
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
|
||||
{
|
||||
_ = _dataChannelManage.ScheduledMeterTaskReadding(DataChannelManage.TaskDataChannel.Reader);
|
||||
// 创建取消令牌源
|
||||
var cts = new CancellationTokenSource();
|
||||
|
||||
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader, cts.Token);
|
||||
|
||||
//此处代码不要删除
|
||||
#if DEBUG
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user