using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.DataChannels
{
///
/// 数据通道管理服务
///
public class DataChannelManageService : IDataChannelManageService, ITransientDependency
{
private readonly ILogger _logger;
private readonly IIoTDbProvider _dbProvider;
private readonly IProducerService _producerService;
private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
public DataChannelManageService(
ILogger logger,
IIoTDbProvider dbProvider,
IoTDBRuntimeContext runtimeContext,
IProducerService producerService,
IOptions kafkaOptions,
IOptions applicationOptions)
{
_logger = logger;
_dbProvider = dbProvider;
_runtimeContext = runtimeContext;
_producerService = producerService;
_kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value;
_runtimeContext.UseTableSessionPool = true;
}
///
/// 定时任务数据通道写入
///
///
public async Task ScheduledMeterTaskWriterAsync(ChannelWriter>> _telemetryPacketInfoWriter, Tuple> dataItems)
{
await _telemetryPacketInfoWriter.WriteAsync(dataItems);
}
///
/// 定时任务数据入库和Kafka推送通道
///
public async Task ScheduledMeterTaskReadingAsync(
ChannelReader>> telemetryPacketInfoReader,
CancellationToken cancellationToken = default)
{
const int BatchSize = 20000; // 修正批次大小
const int EmptyWaitMilliseconds = 1000;
var timeout = TimeSpan.FromSeconds(5);
var metadata = await _dbProvider.GetMetadata();
try
{
while (!cancellationToken.IsCancellationRequested)
{
var batchStopwatch = Stopwatch.StartNew();
var batch = new List>>();
try
{
// 异步批量读取数据
while (batch.Count < BatchSize && batchStopwatch.Elapsed < timeout)
{
while (telemetryPacketInfoReader.TryRead(out var data))
{
batch.Add(data);
if (batch.Count >= BatchSize) break;
}
if (batch.Count >= BatchSize) break;
// 无更多数据时等待
if (!await telemetryPacketInfoReader.WaitToReadAsync(cancellationToken))
break;
}
}
catch (OperationCanceledException)
{
break;
}
if (batch.Count == 0)
{
await Task.Delay(EmptyWaitMilliseconds, cancellationToken);
continue;
}
// 按TopicName分组处理
var topicGroups = new Dictionary>();
foreach (var (topicName, records) in batch)
{
if (!topicGroups.TryGetValue(topicName, out var list))
{
list = new List();
topicGroups[topicName] = list;
}
list.AddRange(records);
}
// 处理每个分组
foreach (var (topicName, records) in topicGroups)
{
try
{
// 批量写入数据库
await _dbProvider.BatchInsertAsync(metadata, records);
// 限流推送Kafka
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: records,
deviceIdSelector: data => data.DeviceId,
processor: async (data, groupIndex) =>
await KafkaProducerIssuedMessageAction(topicName, data, groupIndex)
);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理主题 {TopicName} 数据时发生异常", topicName);
}
}
_logger.LogInformation("处理完成批次: {Count} 条, 耗时: {Elapsed}ms",
batch.Count, batchStopwatch.ElapsedMilliseconds);
}
}
catch (Exception ex)
{
_logger.LogCritical(ex, "定时任务处理发生致命错误");
throw;
}
}
///
/// Kafka推送消息(增加重试机制和参数校验)
///
protected async Task KafkaProducerIssuedMessageAction(
string topicName,
T taskRecord,
int partition) where T : class
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
}
const int maxRetries = 3;//重试次数
for (int retry = 0; retry < maxRetries; retry++)
{
try
{
await _producerService.ProduceAsync(topicName, taskRecord, partition);
return;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Kafka推送{topicName}主题分区{partition}重试中({Retry}/{MaxRetries})", topicName, partition, retry + 1, maxRetries);
if (retry == maxRetries - 1) throw;
await Task.Delay(1000 * (retry + 1));
}
}
}
}
}