using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Repository.MeterReadingRecord; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Threading.Tasks; using TouchSocket.Sockets; using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Subscribers { public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IIoTDbProvider _dbProvider; private readonly IProtocolService _protocolService; /// /// Initializes a new instance of the class. /// /// The logger. /// The TCP service. /// The service provider. /// The message received login event repository. /// The message received heartbeat event repository. /// The device repository. public SubscriberAppService(ILogger logger, ITcpService tcpService, IServiceProvider serviceProvider, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository, IIoTDbProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository; _dbProvider = dbProvider; _protocolService = protocolService; } [LogIntercept] [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] public async Task LoginIssuedEvent(List issuedEventMessages) { bool isAck = false; foreach (var issuedEventMessage in issuedEventMessages) { var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); if (loginEntity == null) { isAck=false; break; } loginEntity.AckTime = Clock.Now; loginEntity.IsAck = true; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); isAck = true; } // TODO:暂时ACK,等后续处理是否放到私信队列中 return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] public async Task HeartbeatIssuedEvent(List issuedEventMessages) { bool isAck = false; foreach (var issuedEventMessage in issuedEventMessages) { var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); if (heartbeatEntity == null) { isAck = false; break; } heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); } // TODO:暂时ACK,等后续处理是否放到私信队列中 return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] public async Task ReceivedEvent(MessageReceived receivedMessage) { var currentTime = Clock.Now; var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { //todo 会根据不同的协议进行解析,然后做业务处理 TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); if (tB3761 == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (tB3761.DT == null || tB3761.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } //报文入库 var entity = new MeterReadingRecords() { ReceivedMessageHexString = receivedMessage.MessageHexString, AFN = (AFN)tB3761.AFN_FC.AFN, Fn = tB3761.DT.Fn, Pn = 0, FocusAddress = "", MeterAddress = "", }; //如果没数据,则插入,有数据则更新 var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime); if (updateEntity == null) { await _meterReadingRecordsRepository.InsertAsync(entity, currentTime); } //_dbProvider.InsertAsync(); //todo 查找是否有下发任务 //await _messageReceivedEventRepository.InsertAsync(receivedMessage); } return SubscribeAck.Success(); } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName, EnableBatch = true)] public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) { await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages); return SubscribeAck.Success(); } [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName,EnableBatch =true)] public async Task ReceivedLoginEvent(List receivedLoginMessages) { await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages); return SubscribeAck.Success(); } } }