using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; using JiShe.CollectBus.Repository.MeterReadingRecord; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; using System.Linq; using System.Threading.Tasks; using JiShe.CollectBus.IoTDB.Interface; using TouchSocket.Sockets; using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Subscribers { public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; private readonly IRepository _messageReceivedEventRepository; private readonly IRepository _deviceRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IIoTDBProvider _dbProvider; /// /// Initializes a new instance of the class. /// /// The logger. /// The TCP service. /// The service provider. /// The message received login event repository. /// The message received heartbeat event repository. /// The message received event repository. /// The device repository. /// The device repository. public SubscriberAppService(ILogger logger, ITcpService tcpService, IServiceProvider serviceProvider, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository, IRepository messageReceivedEventRepository, IRepository deviceRepository, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordsRepository) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; _messageReceivedEventRepository = messageReceivedEventRepository; _deviceRepository = deviceRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository; _dbProvider = dbProvider; } [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) { bool isAck = false; switch (issuedEventMessage.Type) { case IssuedEventType.Heartbeat: break; case IssuedEventType.Login: _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); loginEntity.AckTime = Clock.Now; loginEntity.IsAck = true; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); isAck = true; break; case IssuedEventType.Data: break; default: throw new ArgumentOutOfRangeException(); } //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); //if (device != null) //{ // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); //} await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) { bool isAck = false; switch (issuedEventMessage.Type) { case IssuedEventType.Heartbeat: _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); isAck = true; break; case IssuedEventType.Data: break; default: throw new ArgumentOutOfRangeException(); } //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); //if (device != null) //{ // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); //} await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] //[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] public async Task ReceivedEvent(MessageReceived receivedMessage) { var currentTime = Clock.Now; var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { //todo 会根据不同的协议进行解析,然后做业务处理 TB3761 fN = await protocolPlugin.AnalyzeAsync(receivedMessage); if(fN == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } var tb3761FN = fN.FnList.FirstOrDefault(); if (tb3761FN == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } //报文入库 var entity = new MeterReadingRecords() { ReceivedMessageHexString = receivedMessage.MessageHexString, AFN = fN.Afn, Fn = tb3761FN.Fn, Pn = 0, FocusAddress = "", MeterAddress = "", }; //如果没数据,则插入,有数据则更新 var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime); if (updateEntity == null) { await _meterReadingRecordsRepository.InsertAsync(entity, currentTime); } //_dbProvider.InsertAsync(); //todo 查找是否有下发任务 //await _messageReceivedEventRepository.InsertAsync(receivedMessage); } return SubscribeAck.Success(); } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); } return SubscribeAck.Success(); } [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { await protocolPlugin.LoginAsync(receivedLoginMessage); await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); } return SubscribeAck.Success(); } } }