using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Threading.Tasks; using TouchSocket.Sockets; using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.Subscribers { /// /// 定时抄读任务消息消费订阅 /// public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IIoTDbProvider _dbProvider; /// /// Initializes a new instance of the class. /// /// The logger. /// The TCP service. /// IoTDB数据驱动 public WorkerSubscriberAppService(ILogger logger, ITcpService tcpService, IIoTDbProvider dbProvider) { _logger = logger; _tcpService = tcpService; _dbProvider = dbProvider; } #region 电表消息采集 /// /// 一分钟定时抄读任务消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogError($"1分钟采集电表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}"); return await SendMessagesAsync(receivedMessage); } /// /// 5分钟采集电表数据下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, EnableBatch = true, BatchSize = 500)] public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogError($"5分钟采集电表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}"); return await SendMessagesAsync(receivedMessage); } /// /// 15分钟采集电表数据下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, EnableBatch = true, TaskCount = 30, BatchSize = 500)] public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(List receivedMessage) { foreach (var item in receivedMessage) { _logger.LogError($"15分钟采集电表数据下行消息消费队列开始处理_ItemCode_{item.ItemCode}_{item.Serialize()}"); await SendMessagesAsync(item); } return SubscribeAck.Success(); } /// /// 电表自动阀控下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, EnableBatch = true, BatchSize = 500)] public async Task AmmeterScheduledAutoValveControl(List receivedMessage) { //todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。 foreach (var item in receivedMessage) { _logger.LogError($"电表自动阀控下行消息消费队列开始处理:{item.Serialize()}"); await SendMessagesAsync(item); } return SubscribeAck.Success(); } /// /// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号、SIM卡号、定时校时等下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, EnableBatch = true, BatchSize = 500)] public async Task AmmeterScheduledOther(List receivedMessage) { foreach (var item in receivedMessage) { _logger.LogError($"其他采集数据下行消息消费队列开始处理:{item.Serialize()}"); await SendMessagesAsync(item); } return SubscribeAck.Success(); } /// /// 电表手动阀控下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerManualValveControlIssuedEventName)] public async Task AmmeterScheduledManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogError($"电表手动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}"); await SendMessagesAsync(receivedMessage); return SubscribeAck.Success(); } /// /// 电表手动抄读下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName)] public async Task AmmeterScheduledManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogError($"电表手动抄读下行消息消费队列开始处理:{receivedMessage.Serialize()}"); await SendMessagesAsync(receivedMessage); return SubscribeAck.Success(); } /// /// 重试电表下行消息消费队列 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerRetryEventName)] public async Task AmmeterScheduledMeterRetryReadingEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("重试电表数据下行消息消费队列开始处理"); //return await SendMessagesAsync(receivedMessage); try { var checkResult = _tcpService.ClientExists(receivedMessage.FocusAddress); if (checkResult) { await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); receivedMessage.IsSend = true; // 增加·发送次数和重试开始时间 receivedMessage.SendNum += 1; receivedMessage.NextSendTime = DateTime.Now.AddHours(1); await _dbProvider.InsertAsync(receivedMessage); // TODO: 第4次的时候会推送到地方预警处理 } // 由于有3次重试机会,故每次消息都会被确认 return SubscribeAck.Success(); } catch (Exception) { throw; } } #endregion #region 水表消息采集 /// /// 水表数据下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] public async Task WatermeterScheduledAutoReading(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogError($"水表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}"); await SendMessagesAsync(receivedMessage); return SubscribeAck.Success(); } /// /// 水表自动阀控下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoValveControlIssuedEventName)] public async Task WatermeterScheduleAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogError($"水表自动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}"); await SendMessagesAsync(receivedMessage); return SubscribeAck.Success(); } /// /// 水表手动阀控下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerManualValveControlIssuedEventName)] public async Task WatermeterScheduleManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogError($"水表手动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}"); await SendMessagesAsync(receivedMessage); return SubscribeAck.Success(); } /// /// 水表手动抄读下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerManualValveReadingIssuedEventName)] public async Task WatermeterScheduleManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogError($"水表手动抄读下行消息消费队列开始处理:{receivedMessage.Serialize()}"); await SendMessagesAsync(receivedMessage); return SubscribeAck.Success(); } #endregion /// /// 设备报文发送 /// /// 消息记录 /// private async Task SendMessagesAsync(MeterReadingTelemetryPacketInfo receivedMessage) { try { var checkResult = _tcpService.ClientExists(receivedMessage.FocusAddress); if (checkResult) { await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); receivedMessage.IsSend = true; // 增加·发送次数和重试开始时间 receivedMessage.SendNum += 1; receivedMessage.NextSendTime = DateTime.Now.AddHours(1); await _dbProvider.InsertAsync(receivedMessage); return SubscribeAck.Success(); } else { return SubscribeAck.Fail(); } } catch (Exception) { throw; } } } }