2025-04-30 12:36:54 +08:00
|
|
|
|
using DnsClient.Protocol;
|
|
|
|
|
|
using JiShe.CollectBus.Application.Contracts;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using JiShe.CollectBus.Common;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using JiShe.CollectBus.IoTDB.Context;
|
|
|
|
|
|
using JiShe.CollectBus.IoTDB.Interface;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using JiShe.CollectBus.Kafka.Internal;
|
|
|
|
|
|
using JiShe.CollectBus.Kafka.Producer;
|
|
|
|
|
|
using JiShe.CollectBus.Protocol.Interfaces;
|
|
|
|
|
|
using JiShe.CollectBus.Protocol.Services;
|
|
|
|
|
|
using JiShe.CollectBus.RedisDataCache;
|
|
|
|
|
|
using JiShe.CollectBus.ScheduledMeterReading;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using Microsoft.AspNetCore.Mvc;
|
|
|
|
|
|
using Microsoft.AspNetCore.Mvc.RazorPages;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
|
|
using Microsoft.Extensions.Options;
|
|
|
|
|
|
using System;
|
|
|
|
|
|
using System.Collections.Generic;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using System.Diagnostics;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using System.Linq;
|
|
|
|
|
|
using System.Text;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using System.Threading;
|
|
|
|
|
|
using System.Threading.Channels;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
using System.Threading.Tasks;
|
2025-04-30 12:36:54 +08:00
|
|
|
|
using Volo.Abp.DependencyInjection;
|
|
|
|
|
|
using Volo.Abp.Timing;
|
2025-04-29 23:55:53 +08:00
|
|
|
|
|
|
|
|
|
|
namespace JiShe.CollectBus.DataChannels
|
|
|
|
|
|
{
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 数据通道管理服务
|
|
|
|
|
|
/// </summary>
|
2025-04-30 12:36:54 +08:00
|
|
|
|
public class DataChannelManageService : IDataChannelManageService, ITransientDependency
|
2025-04-29 23:55:53 +08:00
|
|
|
|
{
|
|
|
|
|
|
private readonly ILogger<DataChannelManageService> _logger;
|
|
|
|
|
|
private readonly IIoTDbProvider _dbProvider;
|
|
|
|
|
|
private readonly IProducerService _producerService;
|
|
|
|
|
|
private readonly KafkaOptionConfig _kafkaOptions;
|
|
|
|
|
|
private readonly ServerApplicationOptions _applicationOptions;
|
|
|
|
|
|
private readonly IoTDBRuntimeContext _runtimeContext;
|
|
|
|
|
|
|
|
|
|
|
|
public DataChannelManageService(
|
|
|
|
|
|
ILogger<DataChannelManageService> logger,
|
|
|
|
|
|
IIoTDbProvider dbProvider,
|
|
|
|
|
|
IoTDBRuntimeContext runtimeContext,
|
|
|
|
|
|
IProducerService producerService,
|
|
|
|
|
|
IOptions<KafkaOptionConfig> kafkaOptions,
|
|
|
|
|
|
IOptions<ServerApplicationOptions> applicationOptions)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger = logger;
|
|
|
|
|
|
_dbProvider = dbProvider;
|
|
|
|
|
|
_runtimeContext = runtimeContext;
|
|
|
|
|
|
_producerService = producerService;
|
|
|
|
|
|
_kafkaOptions = kafkaOptions.Value;
|
|
|
|
|
|
_applicationOptions = applicationOptions.Value;
|
|
|
|
|
|
_runtimeContext.UseTableSessionPool = true;
|
|
|
|
|
|
}
|
2025-04-30 12:36:54 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 定时任务数据通道写入
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task ScheduledMeterTaskWriter(ChannelWriter<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoWriter, Tuple<string, List<MeterReadingTelemetryPacketInfo>> dataItems)
|
|
|
|
|
|
{
|
|
|
|
|
|
await _telemetryPacketInfoWriter.WriteAsync(dataItems);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 定时任务数据入库和Kafka推送通道
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task ScheduledMeterTaskReadding(ChannelReader<Tuple<string, List<MeterReadingTelemetryPacketInfo>>> _telemetryPacketInfoReader)
|
|
|
|
|
|
{
|
|
|
|
|
|
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
|
|
|
|
|
var batchSize = 10000;
|
|
|
|
|
|
var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒
|
|
|
|
|
|
|
|
|
|
|
|
List<MeterReadingTelemetryPacketInfo> taskInfoList = new List<MeterReadingTelemetryPacketInfo>();
|
|
|
|
|
|
var startTime = DateTime.Now;
|
|
|
|
|
|
var timer = new Stopwatch();
|
|
|
|
|
|
while (true)
|
|
|
|
|
|
{
|
|
|
|
|
|
var canRead = await _telemetryPacketInfoReader.WaitToReadAsync();
|
|
|
|
|
|
if (!canRead)
|
|
|
|
|
|
{
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (_telemetryPacketInfoReader.TryRead(out var dataItem))
|
|
|
|
|
|
{
|
|
|
|
|
|
taskInfoList.AddRange(dataItem.Item2);
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
//无消息时短暂等待
|
|
|
|
|
|
await Task.Delay(5);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (taskInfoList != null && taskInfoList.Count > 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
await _dbProvider.BatchInsertAsync(metadata, taskInfoList);
|
|
|
|
|
|
|
|
|
|
|
|
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
|
|
|
|
|
|
items: taskInfoList.ToList(),
|
|
|
|
|
|
deviceIdSelector: data => data.DeviceId,
|
|
|
|
|
|
processor: (data, groupIndex) =>
|
|
|
|
|
|
{
|
|
|
|
|
|
// _ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex);
|
|
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
taskInfoList.Clear();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
startTime = DateTime.Now;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// Kafka 推送消息
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="topicName">主题名称</param>
|
|
|
|
|
|
/// <param name="taskRecord">任务记录</param>
|
|
|
|
|
|
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
|
|
|
|
|
|
T taskRecord, int partition) where T : class
|
|
|
|
|
|
{
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-29 23:55:53 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|