2025-05-12 14:02:22 +08:00
|
|
|
|
using Amazon.Runtime.Internal.Transform;
|
|
|
|
|
|
using DnsClient.Protocol;
|
|
|
|
|
|
using JiShe.CollectBus.Common;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using JiShe.CollectBus.IoTDB.Context;
|
|
|
|
|
|
using JiShe.CollectBus.IoTDB.Interface;
|
2025-05-12 14:02:22 +08:00
|
|
|
|
using JiShe.CollectBus.IotSystems.LogRecord;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using JiShe.CollectBus.Kafka.Internal;
|
|
|
|
|
|
using JiShe.CollectBus.Kafka.Producer;
|
2025-05-12 14:02:22 +08:00
|
|
|
|
using JiShe.CollectBus.Protocol.Dto;
|
|
|
|
|
|
using JiShe.CollectBus.Protocol.Models;
|
|
|
|
|
|
using JiShe.CollectBus.Repository.LogRecord;
|
|
|
|
|
|
using Mapster;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
|
|
using Microsoft.Extensions.Options;
|
|
|
|
|
|
using System;
|
|
|
|
|
|
using System.Collections.Generic;
|
2025-05-12 14:02:22 +08:00
|
|
|
|
using System.Collections.Immutable;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using System.Diagnostics;
|
2025-05-12 14:02:22 +08:00
|
|
|
|
using System.Linq;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using System.Threading;
|
|
|
|
|
|
using System.Threading.Channels;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using System.Threading.Tasks;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using Volo.Abp.DependencyInjection;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
|
|
|
|
|
|
namespace JiShe.CollectBus.DataChannels
|
|
|
|
|
|
{
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 数据通道管理服务
|
|
|
|
|
|
/// </summary>
|
2025-04-30 12:36:54 +08:00
|
|
|
|
public class DataChannelManageService : IDataChannelManageService, ITransientDependency
|
2025-04-29 23:55:53 +08:00
|
|
|
|
{
|
|
|
|
|
|
private readonly ILogger<DataChannelManageService> _logger;
|
|
|
|
|
|
private readonly IIoTDbProvider _dbProvider;
|
|
|
|
|
|
private readonly IProducerService _producerService;
|
|
|
|
|
|
private readonly KafkaOptionConfig _kafkaOptions;
|
2025-05-18 16:04:23 +08:00
|
|
|
|
private readonly ServerApplicationOptions _applicationOptions;
|
2025-05-12 14:02:22 +08:00
|
|
|
|
private readonly ILogRecordRepository _logRecordRepository;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
|
|
|
|
|
|
public DataChannelManageService(
|
|
|
|
|
|
ILogger<DataChannelManageService> logger,
|
2025-05-18 16:04:23 +08:00
|
|
|
|
IIoTDbProvider dbProvider,
|
2025-04-29 23:55:53 +08:00
|
|
|
|
IProducerService producerService,
|
|
|
|
|
|
IOptions<KafkaOptionConfig> kafkaOptions,
|
2025-05-12 14:02:22 +08:00
|
|
|
|
IOptions<ServerApplicationOptions> applicationOptions,
|
|
|
|
|
|
ILogRecordRepository logRecordRepository)
|
2025-04-29 23:55:53 +08:00
|
|
|
|
{
|
|
|
|
|
|
_logger = logger;
|
2025-05-18 16:04:23 +08:00
|
|
|
|
_dbProvider = dbProvider;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
_producerService = producerService;
|
|
|
|
|
|
_kafkaOptions = kafkaOptions.Value;
|
2025-05-18 16:04:23 +08:00
|
|
|
|
_applicationOptions = applicationOptions.Value;
|
2025-05-12 14:02:22 +08:00
|
|
|
|
_logRecordRepository= logRecordRepository;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
}
|
2025-04-30 12:36:54 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 定时任务数据通道写入
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
2025-05-08 10:28:23 +08:00
|
|
|
|
public async Task ScheduledMeterTaskWriterAsync(ChannelWriter<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, ValueTuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems)
|
2025-04-30 12:36:54 +08:00
|
|
|
|
{
|
|
|
|
|
|
await _telemetryPacketInfoWriter.WriteAsync(dataItems);
|
|
|
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 定时任务数据入库和Kafka推送通道
|
|
|
|
|
|
/// </summary>
|
2025-05-06 14:33:49 +08:00
|
|
|
|
public async Task ScheduledMeterTaskReadingAsync(
|
2025-05-08 10:28:23 +08:00
|
|
|
|
ChannelReader<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>> telemetryPacketInfoReader)
|
2025-04-30 12:36:54 +08:00
|
|
|
|
{
|
2025-05-13 14:51:38 +08:00
|
|
|
|
const int BatchSize = 50000;
|
2025-05-12 11:20:50 +08:00
|
|
|
|
const int EmptyWaitMilliseconds = 50;
|
2025-05-18 16:04:23 +08:00
|
|
|
|
var timeout = TimeSpan.FromMilliseconds(50);
|
2025-05-08 10:28:23 +08:00
|
|
|
|
var timer = Stopwatch.StartNew();
|
|
|
|
|
|
long timeoutMilliseconds = 0;
|
2025-05-06 14:33:49 +08:00
|
|
|
|
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
2025-05-18 16:04:23 +08:00
|
|
|
|
var timeoutStopwatch = Stopwatch.StartNew();
|
2025-05-06 14:33:49 +08:00
|
|
|
|
|
2025-04-30 15:57:14 +08:00
|
|
|
|
try
|
2025-04-30 12:36:54 +08:00
|
|
|
|
{
|
2025-05-08 10:28:23 +08:00
|
|
|
|
while (true)
|
2025-04-30 12:36:54 +08:00
|
|
|
|
{
|
2025-05-08 10:28:23 +08:00
|
|
|
|
var batch = new List<ValueTuple<string, List<MeterReadingTelemetryPacketInfo>>>();
|
|
|
|
|
|
var canRead = telemetryPacketInfoReader.Count;
|
|
|
|
|
|
if (canRead <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (timeoutMilliseconds > 0)
|
|
|
|
|
|
{
|
2025-05-13 14:51:38 +08:00
|
|
|
|
_logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 任务数据通道处理数据耗时{timeoutMilliseconds}毫秒");
|
2025-05-08 10:28:23 +08:00
|
|
|
|
}
|
|
|
|
|
|
timeoutMilliseconds = 0;
|
2025-05-12 11:20:50 +08:00
|
|
|
|
//无消息时短等待50毫秒
|
2025-05-08 10:28:23 +08:00
|
|
|
|
await Task.Delay(EmptyWaitMilliseconds);
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
timer.Restart();
|
2025-05-18 16:04:23 +08:00
|
|
|
|
timeoutStopwatch.Restart();
|
2025-05-06 14:33:49 +08:00
|
|
|
|
|
|
|
|
|
|
try
|
2025-04-30 12:36:54 +08:00
|
|
|
|
{
|
2025-05-06 14:33:49 +08:00
|
|
|
|
// 异步批量读取数据
|
2025-05-18 16:04:23 +08:00
|
|
|
|
while (batch != null && batch.Count < BatchSize && timeoutStopwatch.Elapsed <= timeout)
|
2025-04-30 15:57:14 +08:00
|
|
|
|
{
|
2025-05-08 10:28:23 +08:00
|
|
|
|
try
|
2025-05-06 14:33:49 +08:00
|
|
|
|
{
|
2025-05-08 10:28:23 +08:00
|
|
|
|
if (telemetryPacketInfoReader.TryRead(out var dataItem))
|
|
|
|
|
|
{
|
|
|
|
|
|
batch.Add(dataItem);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw;
|
2025-05-06 14:33:49 +08:00
|
|
|
|
}
|
2025-04-30 15:57:14 +08:00
|
|
|
|
}
|
2025-04-30 12:36:54 +08:00
|
|
|
|
}
|
2025-05-08 10:28:23 +08:00
|
|
|
|
catch (Exception)
|
2025-05-06 14:33:49 +08:00
|
|
|
|
{
|
2025-05-08 10:28:23 +08:00
|
|
|
|
throw;
|
2025-05-06 14:33:49 +08:00
|
|
|
|
}
|
2025-05-18 16:04:23 +08:00
|
|
|
|
|
2025-04-30 12:36:54 +08:00
|
|
|
|
|
2025-05-06 14:33:49 +08:00
|
|
|
|
if (batch.Count == 0)
|
|
|
|
|
|
{
|
2025-05-08 10:28:23 +08:00
|
|
|
|
await Task.Delay(EmptyWaitMilliseconds);
|
2025-05-06 14:33:49 +08:00
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-04-30 12:36:54 +08:00
|
|
|
|
|
2025-05-06 14:33:49 +08:00
|
|
|
|
// 按TopicName分组处理
|
|
|
|
|
|
var topicGroups = new Dictionary<string, List<MeterReadingTelemetryPacketInfo>>();
|
|
|
|
|
|
foreach (var (topicName, records) in batch)
|
2025-04-30 15:57:14 +08:00
|
|
|
|
{
|
2025-05-06 14:33:49 +08:00
|
|
|
|
if (!topicGroups.TryGetValue(topicName, out var list))
|
2025-04-30 12:36:54 +08:00
|
|
|
|
{
|
2025-05-06 14:33:49 +08:00
|
|
|
|
list = new List<MeterReadingTelemetryPacketInfo>();
|
|
|
|
|
|
topicGroups[topicName] = list;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
}
|
2025-05-06 14:33:49 +08:00
|
|
|
|
list.AddRange(records);
|
2025-04-30 15:57:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-05-06 14:33:49 +08:00
|
|
|
|
// 处理每个分组
|
|
|
|
|
|
foreach (var (topicName, records) in topicGroups)
|
2025-04-30 15:57:14 +08:00
|
|
|
|
{
|
2025-05-06 14:33:49 +08:00
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
// 批量写入数据库
|
2025-05-18 16:04:23 +08:00
|
|
|
|
await _dbProvider.GetSessionPool(true).BatchInsertAsync(metadata, records);
|
2025-05-06 14:33:49 +08:00
|
|
|
|
|
2025-05-09 17:54:52 +08:00
|
|
|
|
// 限流推送Kafka
|
|
|
|
|
|
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
|
|
|
|
|
items: records,
|
|
|
|
|
|
deviceIdSelector: data => data.DeviceId,
|
|
|
|
|
|
processor: async (data, groupIndex) =>
|
|
|
|
|
|
await KafkaProducerIssuedMessageAction(topicName, data, groupIndex)
|
|
|
|
|
|
);
|
2025-05-06 14:33:49 +08:00
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
2025-05-08 10:28:23 +08:00
|
|
|
|
_logger.LogError(ex, "数据通道处理主题 {TopicName} 数据时发生异常", topicName);
|
2025-05-06 14:33:49 +08:00
|
|
|
|
}
|
2025-04-30 15:57:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-05-08 10:28:23 +08:00
|
|
|
|
batch.Clear();
|
|
|
|
|
|
timer.Stop();
|
|
|
|
|
|
|
2025-05-18 16:04:23 +08:00
|
|
|
|
timeoutStopwatch.Stop();
|
2025-05-08 10:28:23 +08:00
|
|
|
|
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
}
|
2025-04-30 15:57:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
2025-05-06 14:33:49 +08:00
|
|
|
|
_logger.LogCritical(ex, "定时任务处理发生致命错误");
|
2025-04-30 15:57:14 +08:00
|
|
|
|
throw;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
2025-05-06 14:33:49 +08:00
|
|
|
|
/// Kafka推送消息(增加重试机制和参数校验)
|
2025-04-30 12:36:54 +08:00
|
|
|
|
/// </summary>
|
2025-05-06 14:33:49 +08:00
|
|
|
|
protected async Task KafkaProducerIssuedMessageAction<T>(
|
|
|
|
|
|
string topicName,
|
|
|
|
|
|
T taskRecord,
|
|
|
|
|
|
int partition) where T : class
|
2025-04-30 12:36:54 +08:00
|
|
|
|
{
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-05-06 14:33:49 +08:00
|
|
|
|
const int maxRetries = 3;//重试次数
|
|
|
|
|
|
for (int retry = 0; retry < maxRetries; retry++)
|
|
|
|
|
|
{
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
await _producerService.ProduceAsync(topicName, taskRecord, partition);
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
2025-05-06 15:21:31 +08:00
|
|
|
|
_logger.LogWarning(ex, "Kafka推送{topicName}主题分区{partition}重试中({Retry}/{MaxRetries})", topicName, partition, retry + 1, maxRetries);
|
2025-05-06 14:33:49 +08:00
|
|
|
|
if (retry == maxRetries - 1) throw;
|
|
|
|
|
|
await Task.Delay(1000 * (retry + 1));
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-04-30 12:36:54 +08:00
|
|
|
|
}
|
2025-05-08 10:28:23 +08:00
|
|
|
|
|
2025-04-30 12:36:54 +08:00
|
|
|
|
|
2025-05-12 14:02:22 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 日志保存
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="channelReader"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task LogSaveAsync(ChannelReader<object> channelReader)
|
|
|
|
|
|
{
|
|
|
|
|
|
const int BatchSize = 1000;
|
|
|
|
|
|
const int EmptyWaitMilliseconds = 1000;
|
|
|
|
|
|
var timeout = TimeSpan.FromSeconds(2);
|
|
|
|
|
|
var timer = Stopwatch.StartNew();
|
|
|
|
|
|
long timeoutMilliseconds = 0;
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
while (true)
|
|
|
|
|
|
{
|
|
|
|
|
|
var batch = new List<object>();
|
|
|
|
|
|
var canRead = channelReader.Count;
|
|
|
|
|
|
if (canRead <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (timeoutMilliseconds > 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(LogSaveAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒");
|
|
|
|
|
|
}
|
|
|
|
|
|
timeoutMilliseconds = 0;
|
|
|
|
|
|
//无消息时短等待1秒
|
|
|
|
|
|
await Task.Delay(EmptyWaitMilliseconds);
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
timer.Restart();
|
|
|
|
|
|
var startTime = DateTime.Now;
|
|
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
// 异步批量读取数据
|
|
|
|
|
|
while (batch != null && batch.Count < BatchSize && (DateTime.Now - startTime) < timeout)
|
|
|
|
|
|
{
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
if (channelReader.TryRead(out var dataItem))
|
|
|
|
|
|
{
|
|
|
|
|
|
batch.Add(dataItem);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (batch == null || batch.Count == 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
await Task.Delay(EmptyWaitMilliseconds);
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
// 按小时分组
|
|
|
|
|
|
var hourGroups = new Dictionary<DateTime, List<LogRecords>>();
|
|
|
|
|
|
DateTime? dateTime = null;
|
|
|
|
|
|
List<LogRecords> batchList = new List<LogRecords>();
|
|
|
|
|
|
int index = 1;
|
|
|
|
|
|
foreach (var item in batch)
|
|
|
|
|
|
{
|
|
|
|
|
|
var records = item.Adapt<LogRecords>();
|
|
|
|
|
|
|
|
|
|
|
|
if (!records.ReceivedTime.HasValue)
|
|
|
|
|
|
records.ReceivedTime = DateTime.Now;
|
|
|
|
|
|
var curDateTime = new DateTime(records.ReceivedTime.Value.Year, records.ReceivedTime.Value.Month, records.ReceivedTime.Value.Hour, records.ReceivedTime.Value.Day, records.ReceivedTime.Value.Hour, 0, 0);
|
|
|
|
|
|
if (!dateTime.HasValue || curDateTime != dateTime)
|
|
|
|
|
|
{
|
|
|
|
|
|
dateTime = curDateTime;
|
|
|
|
|
|
if (batchList.Count > 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
var immutableList = ImmutableList.CreateRange(batchList);
|
|
|
|
|
|
hourGroups.Add(dateTime.Value, immutableList.ToList());
|
|
|
|
|
|
batchList.Clear();
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
batchList.Add(records);
|
|
|
|
|
|
// 最后一批
|
|
|
|
|
|
if(index== batch.Count)
|
|
|
|
|
|
{
|
|
|
|
|
|
var immutableList = ImmutableList.CreateRange(batchList);
|
|
|
|
|
|
hourGroups.Add(dateTime.Value, immutableList.ToList());
|
|
|
|
|
|
batchList.Clear();
|
|
|
|
|
|
}
|
|
|
|
|
|
index++;
|
|
|
|
|
|
}
|
|
|
|
|
|
foreach (var (time, records) in hourGroups)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 批量写入数据库
|
|
|
|
|
|
await _logRecordRepository.InsertManyAsync(records, time);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError(ex, "数据通道处理日志数据时发生异常");
|
|
|
|
|
|
}
|
|
|
|
|
|
batch.Clear();
|
|
|
|
|
|
timer.Stop();
|
|
|
|
|
|
|
|
|
|
|
|
timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds;
|
|
|
|
|
|
|
|
|
|
|
|
startTime = DateTime.Now;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogCritical(ex, "日志处理发生致命错误");
|
|
|
|
|
|
throw;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-29 23:55:53 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|