using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; using Microsoft.Extensions.Logging; using System; using System.Threading.Tasks; using TouchSocket.Sockets; namespace JiShe.CollectBus.Subscribers { /// /// 定时抄读任务消息消费订阅 /// public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IIoTDbProvider _dbProvider; /// /// Initializes a new instance of the class. /// /// The logger. /// The TCP service. /// IoTDB数据驱动 public WorkerSubscriberAppService(ILogger logger, ITcpService tcpService, IIoTDbProvider dbProvider) { _logger = logger; _tcpService = tcpService; _dbProvider = dbProvider; } #region 电表消息采集 /// /// 一分钟定时抄读任务消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); return await SendMessagesAsync(receivedMessage); } /// /// 5分钟采集电表数据下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); return await SendMessagesAsync(receivedMessage); } /// /// 15分钟采集电表数据下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); return await SendMessagesAsync(receivedMessage); } /// /// 电表自动阀控下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)] public async Task AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("电表自动阀控下行消息消费队列开始处理"); return await SendMessagesAsync(receivedMessage); } #endregion #region 水表消息采集 /// /// 水表数据下行消息消费订阅 /// /// /// [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { return await SendMessagesAsync(receivedMessage); } #endregion /// /// 设备报文发送 /// /// 消息记录 /// private async Task SendMessagesAsync(MeterReadingTelemetryPacketInfo receivedMessage) { try { var checkResult = _tcpService.ClientExists(receivedMessage.FocusAddress); if (checkResult) { await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString)); receivedMessage.IsSend = true; await _dbProvider.InsertAsync(receivedMessage); return SubscribeAck.Success(); } else { return SubscribeAck.Fail(); } } catch (Exception) { throw; } } } }