using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol3761; using JiShe.CollectBus.Repository.MeterReadingRecord; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; using System.Threading.Tasks; using JiShe.CollectBus.Protocol; using TouchSocket.Sockets; using JiShe.CollectBus.Protocol.Dto; namespace JiShe.CollectBus.Subscribers { public class SubscriberAnalysisAppService : CollectBusAppService, ISubscriberAnalysisAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IIoTDbProvider _dbProvider; private readonly IProtocolService _protocolService; public SubscriberAnalysisAppService(ILogger logger, ITcpService tcpService, IServiceProvider serviceProvider, IIoTDbProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; _meterReadingRecordsRepository = meterReadingRecordsRepository; _dbProvider = dbProvider; _protocolService = protocolService; } /// /// 解析AFN00H /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN00HReceivedEventNameTemp)] public async Task ReceivedAFN00Event(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data==null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); } return SubscribeAck.Fail(); } /// /// 解析AFN01H /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN01HReceivedEventNameTemp)] public async Task ReceivedAFN01Event(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); } return SubscribeAck.Fail(); } /// /// 解析AFN02H /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN02HReceivedEventNameTemp)] public async Task ReceivedAFN02Event(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data, (result) => { var ssss = (UnitDataAnalysis)result; }); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN03H /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN03HReceivedEventNameTemp)] public async Task ReceivedAFN03Event(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN04H /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN04HReceivedEventNameTemp)] public async Task ReceivedAFN04Event(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN05H /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN05HReceivedEventNameTemp)] public async Task ReceivedAFN05Event(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN09H /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN09HReceivedEventNameTemp)] public async Task ReceivedAFN09Event(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN0AH /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN0AHReceivedEventNameTemp)] public async Task ReceivedAFN0AEvent(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN0BH /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN0BHReceivedEventNameTemp)] public async Task ReceivedAFN0BEvent(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN0CH /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN0CHReceivedEventNameTemp)] public async Task ReceivedAFN0CEvent(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN0DH /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN0DHReceivedEventNameTemp)] public async Task ReceivedAFN0DEvent(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN0EH /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN0EHReceivedEventNameTemp)] public async Task ReceivedAFN0EEvent(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } /// /// 解析AFN0HH /// /// /// [KafkaSubscribe(ProtocolConst.SubscriberAFN10HReceivedEventNameTemp)] public async Task ReceivedAFN0HEvent(MessageProtocolAnalysis receivedMessage) { var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } else { if (receivedMessage.Data == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); //var data = await analysisStrategy.ExecuteAsync>(tB3761); var executor = _serviceProvider.GetRequiredService(); await executor.ExecuteAsync(serverName, receivedMessage.Data); return SubscribeAck.Success(); } return SubscribeAck.Fail(); } } }