using System; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; using DotNetCore.CAP; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; using Volo.Abp.Caching; using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Subscribers { /// /// 定时抄读任务消息消费订阅 /// [Route($"/worker/app/subscriber")] public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IRepository _deviceRepository; /// /// Initializes a new instance of the class. /// /// The logger. /// The TCP service. /// The service provider. public WorkerSubscriberAppService(ILogger logger, ITcpService tcpService, IRepository deviceRepository, IServiceProvider serviceProvider) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; _deviceRepository = deviceRepository; } #region 电表消息采集 /// /// 一分钟定时抄读任务消息消费订阅 /// /// /// [HttpPost] [Route("ammeter/oneminute/issued-event")] [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); } else { var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); if (device != null) { await _tcpService.SendAsync(device.ClientId, receivedMessage.Message); } } } /// /// 5分钟采集电表数据下行消息消费订阅 /// /// /// [HttpPost] [Route("ammeter/fiveminute/issued-event")] [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); } else { var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); if (device != null) { await _tcpService.SendAsync(device.ClientId, receivedMessage.Message); } } } /// /// 15分钟采集电表数据下行消息消费订阅 /// /// /// [HttpPost] [Route("ammeter/fifteenminute/issued-event")] [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); } else { var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); if (device != null) { await _tcpService.SendAsync(device.ClientId, receivedMessage.Message); } } } #endregion #region 水表消息采集 /// /// 一分钟定时抄读任务消息消费订阅 /// /// /// [HttpPost] [Route("watermeter/oneminute/issued-event")] [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)] public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); } else { var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); if (device != null) { await _tcpService.SendAsync(device.ClientId, receivedMessage.Message); } } } /// /// 5分钟采集电表数据下行消息消费订阅 /// /// /// [HttpPost] [Route("watermeter/fiveminute/issued-event")] [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); } else { var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); if (device != null) { await _tcpService.SendAsync(device.ClientId, receivedMessage.Message); } } } /// /// 15分钟采集电表数据下行消息消费订阅 /// /// /// [HttpPost] [Route("watermeter/fifteenminute/issued-event")] [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!"); } else { var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.DeviceNo); if (device != null) { await _tcpService.SendAsync(device.ClientId, receivedMessage.Message); } } } #endregion } }