using System; using System.Threading.Tasks; using DotNetCore.CAP; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Devices; using JiShe.CollectBus.MessageReceiveds; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Subscribers { public class SubscriberAppService : CollectBusAppService, ISubscriberAppService,ICapSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; private readonly IRepository _messageReceivedEventRepository; private readonly IRepository _deviceRepository; /// /// Initializes a new instance of the class. /// /// The logger. /// The TCP service. /// The service provider. /// The message received login event repository. /// The message received heartbeat event repository. /// The message received event repository. /// The device repository. public SubscriberAppService(ILogger logger, ITcpService tcpService, IServiceProvider serviceProvider, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository, IRepository messageReceivedEventRepository, IRepository deviceRepository) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; _messageReceivedEventRepository = messageReceivedEventRepository; _deviceRepository = deviceRepository; } [CapSubscribe(ProtocolConst.SubscriberIssuedEventName)] public async Task IssuedEvent(IssuedEventMessage issuedEventMessage) { switch (issuedEventMessage.Type) { case IssuedEventType.Heartbeat: _logger.LogInformation($"IssuedEvent:{issuedEventMessage.MessageId}"); var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); break; case IssuedEventType.Login: var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); loginEntity.AckTime = Clock.Now; loginEntity.IsAck = true; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); break; case IssuedEventType.Data: break; default: throw new ArgumentOutOfRangeException(); } var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); if (device!=null) { await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); } } [CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] public async Task ReceivedEvent(MessageReceived receivedMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { await protocolPlugin.AnalyzeAsync(receivedMessage); await _messageReceivedEventRepository.InsertAsync(receivedMessage); } } [CapSubscribe(ProtocolConst.SubscriberReceivedHeartbeatEventName)] public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) { _logger.LogInformation("心跳消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("【心跳消费队列开始处理】协议不存在!"); } else { await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); _logger.LogInformation($"心跳消费队列完成处理:{receivedHeartbeatMessage.MessageId}"); } } [CapSubscribe(ProtocolConst.SubscriberReceivedLoginEventName)] public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) { _logger.LogInformation("登录消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("【登录消费队列开始处理】协议不存在!"); } else { await protocolPlugin.LoginAsync(receivedLoginMessage); await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); _logger.LogInformation("登录消费队列完成处理"); } } } }