diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs index 0c50f6e..054b5ab 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Services/ProtocolService.cs @@ -12,6 +12,7 @@ using Volo.Abp.DependencyInjection; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using System.Text.RegularExpressions; using Volo.Abp; +using Microsoft.Extensions.Logging; namespace JiShe.CollectBus.Protocol.Contracts.Services { @@ -19,11 +20,13 @@ namespace JiShe.CollectBus.Protocol.Contracts.Services { private readonly IFreeRedisProvider _freeRedisProvider; private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; - public ProtocolService(IFreeRedisProvider freeRedisProvider, IServiceProvider serviceProvider) + public ProtocolService(IFreeRedisProvider freeRedisProvider, IServiceProvider serviceProvider, ILogger logger) { _freeRedisProvider = freeRedisProvider; _serviceProvider = serviceProvider; + _logger= logger; } /// @@ -61,6 +64,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Services } catch (Exception ex) { + _logger.LogError(ex, "获取协议失败"); return null; } } diff --git a/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs b/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs index d1249b7..d313bcd 100644 --- a/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs +++ b/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs @@ -1,15 +1,7 @@ -using JiShe.CollectBus.Common.Enums; -using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; using JiShe.CollectBus.Protocol.Dto; using Microsoft.Extensions.Logging; -using Microsoft.IdentityModel.Logging; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H { @@ -32,16 +24,17 @@ namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H ArgumentNullException.ThrowIfNull(nameof(tB3761)); UnitDataAnalysis dto = new UnitDataAnalysis { - Code = tB3761.A?.Code, - AFN = tB3761.AFN_FC?.AFN ?? 0, - Fn = tB3761.DT?.Fn ?? 0, - Pn = tB3761.DA?.Pn ?? 0 + Code = tB3761.A.Code, + AFN = tB3761.AFN_FC.AFN, + Fn = tB3761.DT.Fn, + Pn = tB3761.DA.Pn, + Data= true }; return Task.FromResult(dto); } catch (Exception ex) { - _logger.LogError(ex, $"00_1解析失败:{tB3761.A?.Code}-{tB3761.DT?.Fn ?? 0}-{tB3761?.BaseHexMessage?.HexMessageString},{ex.Message}"); + _logger.LogError(ex, $"00_1解析失败:{tB3761.A.Code}-{tB3761.DT.Fn}-{tB3761.BaseHexMessage.HexMessageString},{ex.Message}"); return null; } } diff --git a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs index 8009165..462921a 100644 --- a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs +++ b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs @@ -79,8 +79,6 @@ namespace JiShe.CollectBus.Protocol // 注册策略实现 services.AddTransient(analysisStrategyType); strategyMetadata[(strategyType, inputType, resultType)] = analysisStrategyType; - - } } diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 094f9e8..695434d 100644 --- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -73,13 +73,68 @@ namespace JiShe.CollectBus.Protocol await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ); } } - - } + } + await OnTcpNormalReceived(client, tB3761); } return (tB3761 as T)!; } + /// + /// 正常帧处理,将不同的AFN进行分发 + /// + /// + /// + /// + /// + private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient, TB3761 tB3761) + { + //string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); + //todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算? + //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + //{ + // ClientId = client.Id, + // ClientIp = client.IP, + // ClientPort = client.Port, + // MessageHexString = messageHexString, + // DeviceNo = deviceNo, + // MessageId = NewId.NextGuid().ToString() + //}); + + if (tB3761.AFN_FC.BaseHexMessage == null || tB3761.DT.BaseHexMessage == null || tB3761.BaseHexMessage.HexMessageString==null) + { + _logger.LogError("376.1协议解析AFN失败"); + return; + } + // 登录心跳已做了处理,故需要忽略登录和心跳帧 + if (tB3761.DT.Fn == (int)FN.登录 || tB3761.DT.Fn == (int)FN.心跳) + return; + //TODO:根据AFN进行分流推送到kafka + string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761.AFN_FC.AFN.ToString().PadLeft(2, '0')); + + MessageProtocolAnalysis messageReceivedAnalysis = new MessageProtocolAnalysis() + { + ClientId = tcpSessionClient.Id, + ClientIp = tcpSessionClient.IP, + ClientPort = tcpSessionClient.Port, + MessageHexString = tB3761.BaseHexMessage.HexMessageString!, + DeviceNo = tB3761.A.Code!, + MessageId = Guid.NewGuid().ToString(), + ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), + Data = tB3761 + }; + + List topics = ProtocolConstExtensions.GetAllTopicNamesByReceived(); + if (topics.Contains(topicName)) + await _producerService.ProduceAsync(topicName, messageReceivedAnalysis); + else + { + _logger.LogError($"不支持的上报kafka主题:{topicName}"); + await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis); + } + + } + /// /// 登录回复 @@ -119,18 +174,10 @@ namespace JiShe.CollectBus.Protocol ClientPort = client.Port, MessageHexString = messageReceived, DeviceNo = code, - MessageId = Guid.NewGuid().ToString() + MessageId = Guid.NewGuid().ToString(), + ReceivedTime=DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") }; - - //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); - - await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); - - //await _producerBus.Publish( messageReceivedLoginEvent); - - //var aTuple = (Tuple)messageReceived.StringToPairs().GetAnalyzeValue(CommandChunkEnum.A); - //var seq = (Seq)messageReceived.StringToPairs().GetAnalyzeValue(CommandChunkEnum.SEQ); var reqParam = new ReqParameter2 { AFN = AFN.确认或否认, @@ -156,15 +203,12 @@ namespace JiShe.CollectBus.Protocol Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceivedLoginEvent.MessageId }; - //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); if (_tcpService.ClientExists(issuedEventMessage.ClientId)) { await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}"); await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage); } - - //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); } /// @@ -215,7 +259,8 @@ namespace JiShe.CollectBus.Protocol ClientPort = client.Port, MessageHexString = messageReceived, DeviceNo = code, - MessageId = Guid.NewGuid().ToString() + MessageId = Guid.NewGuid().ToString(), + ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") }; await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); @@ -254,11 +299,6 @@ namespace JiShe.CollectBus.Protocol } } - - - - - #region 上行命令 //68 diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAnalysisAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAnalysisAppService.cs new file mode 100644 index 0000000..ec12ab6 --- /dev/null +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAnalysisAppService.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Subscribers +{ + public interface ISubscriberAnalysisAppService + { + + } +} diff --git a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index e9ec23b..badf6de 100644 --- a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -77,13 +77,10 @@ namespace JiShe.CollectBus.Plugins TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync(tcpSessionClient, messageHexString); if (tB3761 == null) { + // TODO: 暂时不处理,后续再处理 _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); } - else - { - await OnTcpNormalReceived(tcpSessionClient, messageHexString, tB3761); - } - await e.InvokeNext(); + await e.InvokeNext(); } //[GeneratorPlugin(typeof(ITcpConnectingPlugin))] @@ -124,76 +121,5 @@ namespace JiShe.CollectBus.Plugins await e.InvokeNext(); } - /// - /// 正常帧处理,将不同的AFN进行分发 - /// - /// - /// - /// - /// - private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761) - { - //await _producerBus.Publish(new MessageReceived - //{ - // ClientId = client.Id, - // ClientIp = client.IP, - // ClientPort = client.Port, - // MessageHexString = messageHexString, - // DeviceNo = deviceNo, - // MessageId = NewId.NextGuid().ToString() - //}); - - - //string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); - //todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算? - //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived - //{ - // ClientId = client.Id, - // ClientIp = client.IP, - // ClientPort = client.Port, - // MessageHexString = messageHexString, - // DeviceNo = deviceNo, - // MessageId = NewId.NextGuid().ToString() - //}); - - if(tB3761?.AFN_FC.BaseHexMessage==null || tB3761.DT.BaseHexMessage == null) - { - _logger.LogError("376.1协议解析AFN失败"); - return; - } - // 登录心跳已做了处理,故需要忽略登录和心跳帧 - //if(tB3761.DT?.Fn == (int)FN.登录 || tB3761.DT?.Fn == (int)FN.心跳) - // return; - - //TODO:根据AFN进行分流推送到kafka - string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761?.AFN_FC.AFN.ToString().PadLeft(2,'0')); - - List topics = ProtocolConstExtensions.GetAllTopicNamesByReceived(); - - if(topics.Contains(topicName)) - await _producerService.ProduceAsync(topicName, new MessageReceived - { - ClientId = tcpSessionClient.Id, - ClientIp = tcpSessionClient.IP, - ClientPort = tcpSessionClient.Port, - MessageHexString = messageHexString, - DeviceNo = tB3761?.A?.Code!, - MessageId = Guid.NewGuid().ToString() - }); - else - { - _logger.LogError($"不支持的上报kafka主题:{topicName}"); - await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived - { - ClientId = tcpSessionClient.Id, - ClientIp = tcpSessionClient.IP, - ClientPort = tcpSessionClient.Port, - MessageHexString = messageHexString, - DeviceNo = tB3761?.A?.Code!, - MessageId = Guid.NewGuid().ToString() - }); - } - - } } } diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs new file mode 100644 index 0000000..319792a --- /dev/null +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs @@ -0,0 +1,83 @@ +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.IoTDB.Interface; +using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.Kafka.Attributes; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Protocol.Contracts; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.Repository.MeterReadingRecord; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using TouchSocket.Sockets; +using Volo.Abp.Domain.Repositories; + +namespace JiShe.CollectBus.Subscribers +{ + public class SubscriberAnalysisAppService : CollectBusAppService, ISubscriberAnalysisAppService, IKafkaSubscribe + { + private readonly ILogger _logger; + private readonly ITcpService _tcpService; + private readonly IServiceProvider _serviceProvider; + private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; + private readonly IIoTDbProvider _dbProvider; + private readonly IProtocolService _protocolService; + + public SubscriberAnalysisAppService(ILogger logger, + ITcpService tcpService, + IServiceProvider serviceProvider, + IIoTDbProvider dbProvider, + IMeterReadingRecordRepository meterReadingRecordsRepository, IProtocolService protocolService) + { + _logger = logger; + _tcpService = tcpService; + _serviceProvider = serviceProvider; + _meterReadingRecordsRepository = meterReadingRecordsRepository; + _dbProvider = dbProvider; + _protocolService = protocolService; + } + + /// + /// 解析AFN00H + /// + /// + /// + [KafkaSubscribe(ProtocolConst.SubscriberAFN00HReceivedEventNameTemp)] + public async Task ReceivedAFN00Event(MessageProtocolAnalysis receivedMessage) + { + var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo); + if (protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } + else + { + if (receivedMessage.Data==null) + { + Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + return SubscribeAck.Success(); + } + if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null) + { + Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + return SubscribeAck.Success(); + } + string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis"; + //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + + //var data = await analysisStrategy.ExecuteAsync>(tB3761); + var executor = _serviceProvider.GetRequiredService(); + await executor.ExecuteAsync(serverName, receivedMessage.Data); + } + + return SubscribeAck.Fail(); + } + + } +} diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 5b3b6a0..f8a880c 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -169,7 +169,6 @@ namespace JiShe.CollectBus.Subscribers public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) { await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages); - return SubscribeAck.Success(); } @@ -180,40 +179,5 @@ namespace JiShe.CollectBus.Subscribers return SubscribeAck.Success(); } - - //[KafkaSubscribe(ProtocolConst.SubscriberAFN02HReceivedEventNameTemp)] - public async Task ReceivedAFN00Event(MessageReceived receivedMessage) - { - - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); - if (tB3761 == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - if (tB3761.DT == null || tB3761.AFN_FC == null) - { - Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return SubscribeAck.Success(); - } - string serverName = $"AFN{tB3761.AFN_FC.AFN}_F{tB3761.DT.Fn}_Analysis"; - //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); - - //var data = await analysisStrategy.ExecuteAsync>(tB3761); - var executor = _serviceProvider.GetRequiredService(); - bool isSucces= await executor.ExecuteAsync("AFN0_F1_Analysis", tB3761); - } - - return SubscribeAck.Fail(); - } - - } } diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MessageReceiveds/MessageReceived.cs b/services/JiShe.CollectBus.Domain/IotSystems/MessageReceiveds/MessageReceived.cs index 4fad589..f194d00 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MessageReceiveds/MessageReceived.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MessageReceiveds/MessageReceived.cs @@ -50,4 +50,9 @@ namespace JiShe.CollectBus.IotSystems.MessageReceiveds public DateTime? AckTime { get; set; } } + + public class MessageProtocolAnalysis : MessageReceived + { + public T Data { get; set; } = default!; + } }