using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; using JiShe.CollectBus.Repository.MeterReadingRecord; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; using System.Linq; using System.Threading.Tasks; using JiShe.CollectBus.IoTDB.Interface; using TouchSocket.Sockets; using Volo.Abp.Domain.Repositories; using System.Collections.Generic; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.Protocol.Dto; using System.Collections; namespace JiShe.CollectBus.Subscribers { public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IIoTDbProvider _dbProvider; /// /// Initializes a new instance of the class. /// /// The logger. /// The TCP service. /// The service provider. /// The message received login event repository. /// The message received heartbeat event repository. /// The device repository. public SubscriberAppService(ILogger logger, ITcpService tcpService, IServiceProvider serviceProvider, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository, IIoTDbProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordsRepository) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository; _dbProvider = dbProvider; } [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] public async Task LoginIssuedEvent(List issuedEventMessages) { bool isAck = false; foreach (var issuedEventMessage in issuedEventMessages) { var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); if (loginEntity == null) { isAck=false; break; } _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); loginEntity.AckTime = Clock.Now; loginEntity.IsAck = true; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); if (_tcpService.ClientExists(issuedEventMessage.ClientId)) await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); isAck = true; } // TODO:暂时ACK,等后续处理是否放到私信队列中 return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] public async Task HeartbeatIssuedEvent(List issuedEventMessages) { bool isAck = false; foreach (var issuedEventMessage in issuedEventMessages) { var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); if (heartbeatEntity == null) { isAck = false; break; } _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); //if (device != null) //{ // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); //} if(_tcpService.ClientExists(issuedEventMessage.ClientId)) await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); } // TODO:暂时ACK,等后续处理是否放到私信队列中 return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] //[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] public async Task ReceivedEvent(MessageReceived receivedMessage) { var currentTime = Clock.Now; var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { //todo 会根据不同的协议进行解析,然后做业务处理 TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); if (tB3761 == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (tB3761.DT == null || tB3761.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } //报文入库 var entity = new MeterReadingRecords() { ReceivedMessageHexString = receivedMessage.MessageHexString, AFN = (AFN)tB3761.AFN_FC.AFN, Fn = tB3761.DT.Fn, Pn = 0, FocusAddress = "", MeterAddress = "", }; //如果没数据,则插入,有数据则更新 var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime); if (updateEntity == null) { await _meterReadingRecordsRepository.InsertAsync(entity, currentTime); } //_dbProvider.InsertAsync(); //todo 查找是否有下发任务 //await _messageReceivedEventRepository.InsertAsync(receivedMessage); } return SubscribeAck.Success(); } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) { //foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages) //{ // var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); // if (protocolPlugin == null) // { // _logger.LogError("协议不存在!"); // } // else // { // //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); // await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); // } //} await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages); return SubscribeAck.Success(); } [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)] //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] public async Task ReceivedLoginEvent(List receivedLoginMessages) { //foreach (var receivedLoginMessage in receivedLoginMessages) //{ //var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); //if (protocolPlugin == null) //{ // _logger.LogError("协议不存在!"); //} //else //{ // //await protocolPlugin.LoginAsync(receivedLoginMessage); // await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); //} //} await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages); return SubscribeAck.Success(); } [KafkaSubscribe(ProtocolConst.SubscriberAFN02HReceivedEventNameTemp)] public async Task ReceivedAFN00Event(MessageReceived receivedMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); if (tB3761 == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (tB3761.DT == null || tB3761.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{tB3761.AFN_FC.AFN}_F{tB3761.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); AFN0_F1_AnalysisDto aFN0_F1_AnalysisDto= await executor.ExecuteAsync("AFN0_F1_Analysis", tB3761); } return SubscribeAck.Success(); } } }