2025-05-18 16:04:23 +08:00

273 lines
12 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using TouchSocket.Sockets;
using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.Subscribers
{
/// <summary>
/// 定时抄读任务消息消费订阅
/// </summary>
public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe
{
private readonly ILogger<WorkerSubscriberAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IIoTDbProvider _dbProvider;
/// <summary>
/// Initializes a new instance of the <see cref="WorkerSubscriberAppService"/> class.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="tcpService">The TCP service.</param>
/// <param name="dbProvider">IoTDB数据驱动</param>
public WorkerSubscriberAppService(ILogger<WorkerSubscriberAppService> logger,
ITcpService tcpService,
IIoTDbProvider dbProvider)
{
_logger = logger;
_tcpService = tcpService;
_dbProvider = dbProvider;
}
#region
/// <summary>
/// 一分钟定时抄读任务消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"1分钟采集电表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}");
return await SendMessagesAsync(receivedMessage);
}
/// <summary>
/// 5分钟采集电表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, EnableBatch = true, BatchSize = 500)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"5分钟采集电表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}");
return await SendMessagesAsync(receivedMessage);
}
/// <summary>
/// 15分钟采集电表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, EnableBatch = true, TaskCount = 30, BatchSize = 500)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(List<MeterReadingTelemetryPacketInfo> receivedMessage)
{
foreach (var item in receivedMessage)
{
_logger.LogError($"15分钟采集电表数据下行消息消费队列开始处理_ItemCode_{item.ItemCode}_{item.Serialize()}");
await SendMessagesAsync(item);
}
return SubscribeAck.Success();
}
/// <summary>
/// 电表自动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, EnableBatch = true, BatchSize = 500)]
public async Task<ISubscribeAck> AmmeterScheduledAutoValveControl(List<MeterReadingTelemetryPacketInfo> receivedMessage)
{
//todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。
foreach (var item in receivedMessage)
{
_logger.LogError($"电表自动阀控下行消息消费队列开始处理:{item.Serialize()}");
await SendMessagesAsync(item);
}
return SubscribeAck.Success();
}
/// <summary>
/// 其他采集数据下行消息主题日冻结月冻结、集中器版本号、SIM卡号、定时校时等下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, EnableBatch = true, BatchSize = 500)]
public async Task<ISubscribeAck> AmmeterScheduledOther(List<MeterReadingTelemetryPacketInfo> receivedMessage)
{
foreach (var item in receivedMessage)
{
_logger.LogError($"其他采集数据下行消息消费队列开始处理:{item.Serialize()}");
await SendMessagesAsync(item);
}
return SubscribeAck.Success();
}
/// <summary>
/// 电表手动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerManualValveControlIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"电表手动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
/// <summary>
/// 电表手动抄读下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"电表手动抄读下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
/// <summary>
/// 重试电表下行消息消费队列
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerRetryEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterRetryReadingEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogInformation("重试电表数据下行消息消费队列开始处理");
//return await SendMessagesAsync(receivedMessage);
try
{
var checkResult = _tcpService.ClientExists(receivedMessage.FocusAddress);
if (checkResult)
{
await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
receivedMessage.IsSend = true;
// 增加·发送次数和重试开始时间
receivedMessage.SendNum += 1;
receivedMessage.NextSendTime = DateTime.Now.AddHours(1);
await _dbProvider.GetSessionPool(true).InsertAsync(receivedMessage);
// TODO 第4次的时候会推送到地方预警处理
}
// 由于有3次重试机会故每次消息都会被确认
return SubscribeAck.Success();
}
catch (Exception)
{
throw;
}
}
#endregion
#region
/// <summary>
/// 水表数据下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)]
public async Task<ISubscribeAck> WatermeterScheduledAutoReading(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"水表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
/// <summary>
/// 水表自动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoValveControlIssuedEventName)]
public async Task<ISubscribeAck> WatermeterScheduleAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"水表自动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
/// <summary>
/// 水表手动阀控下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerManualValveControlIssuedEventName)]
public async Task<ISubscribeAck> WatermeterScheduleManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"水表手动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
/// <summary>
/// 水表手动抄读下行消息消费订阅
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerManualValveReadingIssuedEventName)]
public async Task<ISubscribeAck> WatermeterScheduleManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogError($"水表手动抄读下行消息消费队列开始处理:{receivedMessage.Serialize()}");
await SendMessagesAsync(receivedMessage);
return SubscribeAck.Success();
}
#endregion
/// <summary>
/// 设备报文发送
/// </summary>
/// <param name="receivedMessage">消息记录</param>
/// <returns></returns>
private async Task<ISubscribeAck> SendMessagesAsync(MeterReadingTelemetryPacketInfo receivedMessage)
{
try
{
var checkResult = _tcpService.ClientExists(receivedMessage.FocusAddress);
if (checkResult)
{
await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
receivedMessage.IsSend = true;
// 增加·发送次数和重试开始时间
receivedMessage.SendNum += 1;
receivedMessage.NextSendTime = DateTime.Now.AddHours(1);
await _dbProvider.GetSessionPool(true).InsertAsync(receivedMessage);
return SubscribeAck.Success();
}
else
{
return SubscribeAck.Fail();
}
}
catch (Exception)
{
throw;
}
}
}
}