diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs index 51336bf..e2c7229 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs @@ -101,7 +101,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); if (baseHexMessage.HexMessageList.Count == 0) return null; - string binStr = baseHexMessage.HexMessageString.HexToBin(); + string binStr = baseHexMessage.HexMessageString.HexTo4BinZero(); C c = new C { BaseHexMessage = baseHexMessage, @@ -176,7 +176,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); if (baseHexMessage.HexMessageList.Count == 0) return null; - var binStr = baseHexMessage.HexMessageString.HexToBin(); + var binStr = baseHexMessage.HexMessageString.HexTo4BinZero(); A3 a3 = new A3 { BaseHexMessage = baseHexMessage, @@ -238,7 +238,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); if (baseHexMessage.HexMessageList.Count == 0) return null; - var binStr = baseHexMessage.HexMessageString.HexToBin(); + var binStr = baseHexMessage.HexMessageString.HexTo4BinZero(); SEQ seq = new SEQ { PSEQ = binStr.Substring(binStr.Length - 4, 4).BinToDec(), @@ -354,7 +354,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts /// /// /// - public int CalculatePn(string da1, string da2) => (da2.HexToDec() - 1) * 8 + (8 - da1.HexToBin().IndexOf(da1.Equals("00") ? "0" : "1")); + public int CalculatePn(string da1, string da2) => (da2.HexToDec() - 1) * 8 + (8 - da1.HexTo4BinZero().IndexOf(da1.Equals("00") ? "0" : "1")); /// /// 计算Fn @@ -362,7 +362,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts /// /// /// - public int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexToBin().IndexOf("1")); + public int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexTo4BinZero().IndexOf("1")); } } diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 8f7af53..6c7d4ea 100644 --- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -55,18 +55,19 @@ namespace JiShe.CollectBus.Protocol } else { - if (tB3761.SEQ.CON == 1) + if (tB3761.DT?.Fn == (int)FN.登录) { - if (tB3761.DT?.Fn == 1) - { - // 登录回复 + // 登录回复 + if (tB3761.SEQ.CON == (int)CON.需要对该帧进行确认) await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ); - } - else if (tB3761.DT?.Fn == 2) - { - // 心跳回复 - await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ); - } + } + else if (tB3761.DT?.Fn == (int)FN.心跳) + { + // 心跳回复 + //心跳帧有两种情况: + //1. 集中器先有登录帧,再有心跳帧 + //2. 集中器没有登录帧,只有心跳帧 + await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ); } } @@ -89,9 +90,7 @@ namespace JiShe.CollectBus.Protocol public async Task LoginAsync(ITcpSessionClient client,string messageReceived, string code, int? msa, int? pseq) { string oldClientId = $"{client.Id}"; - await client.ResetIdAsync(code); - var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code); if (deviceInfoList != null && deviceInfoList.Count > 1) { @@ -128,7 +127,8 @@ namespace JiShe.CollectBus.Protocol //await _producerBus.Publish( messageReceivedLoginEvent); - + //var aTuple = (Tuple)messageReceived.StringToPairs().GetAnalyzeValue(CommandChunkEnum.A); + //var seq = (Seq)messageReceived.StringToPairs().GetAnalyzeValue(CommandChunkEnum.SEQ); var reqParam = new ReqParameter2 { AFN = AFN.确认或否认, @@ -166,7 +166,6 @@ namespace JiShe.CollectBus.Protocol string clientId = code; string oldClientId = $"{client.Id}"; - var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code); if (deviceInfoList != null && deviceInfoList.Count > 1) { diff --git a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 7e2650f..e631416 100644 --- a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -71,49 +71,16 @@ namespace JiShe.CollectBus.Plugins _logger.LogError("协议不存在!"); } - protocolPlugin.Analysis3761(messageHexString); - TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync(tcpSessionClient, messageHexString); - - - - //var hexStringList = messageHexString.StringToPairs(); - //var aFn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); - //var fn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); - //var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); - //if (aFn.HasValue && fn.HasValue && aTuple != null && !string.IsNullOrWhiteSpace(aTuple.Item1)) - //{ - //var tcpSessionClient = (ITcpSessionClient)client; - - //if ((AFN)aFn == AFN.链路接口检测) - //{ - // switch (fn) - // { - // case 1: - // await OnTcpLoginReceived(tcpSessionClient, messageHexString, aTuple.Item1); - // break; - // case 3: - // //心跳帧有两种情况: - // //1. 集中器先有登录帧,再有心跳帧 - // //2. 集中器没有登录帧,只有心跳帧 - // await OnTcpHeartbeatReceived(tcpSessionClient, messageHexString, aTuple.Item1); - // break; - // default: - // _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); - // break; - // } - //} - //else - //{ - //await OnTcpNormalReceived(tcpSessionClient, messageHexString); // , aTuple.Item1, aFn.ToString()!.PadLeft(2, '0') - //} - //} - //else - //{ - // _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); - //} - - await e.InvokeNext(); + if (tB3761 == null) + { + _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); + } + else + { + await OnTcpNormalReceived(tcpSessionClient, messageHexString, tB3761); + } + await e.InvokeNext(); } //[GeneratorPlugin(typeof(ITcpConnectingPlugin))] @@ -154,122 +121,20 @@ namespace JiShe.CollectBus.Plugins await e.InvokeNext(); } - /// - /// 登录帧处理 - /// - /// - /// - /// 集中器编号 - /// - private async Task OnTcpLoginReceived(ITcpSessionClient client, string messageHexString, string deviceNo) - { - string oldClientId = $"{client.Id}"; - - await client.ResetIdAsync(deviceNo); - - var deviceInfoList= await _deviceRepository.GetListAsync(a => a.Number == deviceNo); - if (deviceInfoList != null && deviceInfoList.Count > 1) - { - //todo 推送集中器编号重复预警 - _logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复"); - return; - } - - var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo); - if (entity == null) - { - await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online)); - } - else - { - entity.UpdateByLoginAndHeartbeat(oldClientId); - await _deviceRepository.UpdateAsync(entity); - } - - var messageReceivedLoginEvent = new MessageReceivedLogin - { - ClientId = deviceNo, - ClientIp = client.IP, - ClientPort = client.Port, - MessageHexString = messageHexString, - DeviceNo = deviceNo, - MessageId = Guid.NewGuid().ToString() - }; - - //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); - - - await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); - - //await _producerBus.Publish( messageReceivedLoginEvent); - } - - private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo) - { - string clientId = deviceNo; - string oldClientId = $"{client.Id}"; - - var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == deviceNo); - if (deviceInfoList != null && deviceInfoList.Count > 1) - { - //todo 推送集中器编号重复预警 - _logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复"); - return; - } - - var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo); - if (entity == null) //没有登录帧的设备,只有心跳帧 - { - await client.ResetIdAsync(clientId); - await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online)); - } - else - { - if (clientId != oldClientId) - { - entity.UpdateByLoginAndHeartbeat(oldClientId); - } - else - { - entity.UpdateByLoginAndHeartbeat(); - } - - await _deviceRepository.UpdateAsync(entity); - } - - var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat - { - ClientId = clientId, - ClientIp = client.IP, - ClientPort = client.Port, - MessageHexString = messageHexString, - DeviceNo = deviceNo, - MessageId = Guid.NewGuid().ToString() - }; - //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); - - await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); - //await _producerBus.Publish(messageReceivedHeartbeatEvent); - } - /// /// 正常帧处理,将不同的AFN进行分发 /// - /// + /// /// - /// - /// + /// /// - private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient, string messageHexString) //, string deviceNo,string aFn + private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761) { - var _protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + var _protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (_protocolPlugin == null) { - _logger.LogError("协议不存在!"); + _logger.LogError("376.1协议插件不存在!"); } - TB3761? tB3761 = await _protocolPlugin!.AnalyzeAsync(tcpSessionClient, messageHexString); - - //await _producerBus.Publish(new MessageReceived @@ -296,17 +161,15 @@ namespace JiShe.CollectBus.Plugins //}); //TODO:根据AFN进行分流推送到kafka - - - //await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived - //{ - // ClientId = client.Id, - // ClientIp = client.IP, - // ClientPort = client.Port, - // MessageHexString = messageHexString, - // DeviceNo = deviceNo, - // MessageId = Guid.NewGuid().ToString() - //}); + await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + { + ClientId = tcpSessionClient.Id, + ClientIp = tcpSessionClient.IP, + ClientPort = tcpSessionClient.Port, + MessageHexString = messageHexString, + DeviceNo = tB3761?.A?.Code!, + MessageId = Guid.NewGuid().ToString() + }); } } } diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 17a6120..b15b99b 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -69,63 +69,30 @@ namespace JiShe.CollectBus.Subscribers [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] public async Task LoginIssuedEvent(List issuedEventMessages) - { - bool isAck = false; + { foreach (var issuedEventMessage in issuedEventMessages) { - switch (issuedEventMessage.Type) - { - case IssuedEventType.Heartbeat: - break; - case IssuedEventType.Login: - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); - var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); - loginEntity.AckTime = Clock.Now; - loginEntity.IsAck = true; - await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - isAck = true; - break; - case IssuedEventType.Data: - break; - default: - throw new ArgumentOutOfRangeException(); - } - - //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); - //if (device != null) - //{ - // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); - //} - + _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); + var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); + loginEntity.AckTime = Clock.Now; + loginEntity.IsAck = true; + await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); } - - return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); + return SubscribeAck.Success(); } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] public async Task HeartbeatIssuedEvent(List issuedEventMessages) { - bool isAck = false; foreach (var issuedEventMessage in issuedEventMessages) { - switch (issuedEventMessage.Type) - { - case IssuedEventType.Heartbeat: - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); - var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); - heartbeatEntity.AckTime = Clock.Now; - heartbeatEntity.IsAck = true; - await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); - isAck = true; - break; - case IssuedEventType.Data: - break; - default: - throw new ArgumentOutOfRangeException(); - } - + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); + var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); + heartbeatEntity.AckTime = Clock.Now; + heartbeatEntity.IsAck = true; + await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); //if (device != null) //{ @@ -134,8 +101,8 @@ namespace JiShe.CollectBus.Subscribers await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); } - - return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); + + return SubscribeAck.Success(); } [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] @@ -152,7 +119,7 @@ namespace JiShe.CollectBus.Subscribers else { //todo 会根据不同的协议进行解析,然后做业务处理 - TB3761 tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); + TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); if (tB3761 == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); diff --git a/shared/JiShe.CollectBus.Common/Enums/376Enums.cs b/shared/JiShe.CollectBus.Common/Enums/376Enums.cs index 68df1da..6ea182d 100644 --- a/shared/JiShe.CollectBus.Common/Enums/376Enums.cs +++ b/shared/JiShe.CollectBus.Common/Enums/376Enums.cs @@ -368,4 +368,9 @@ namespace JiShe.CollectBus.Common.Enums HardwareReleaseDate=38 } + public enum FN + { + 登录 = 1, + 心跳 = 3 + } } diff --git a/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs index 8eeb1ff..b2722a8 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs @@ -1178,24 +1178,25 @@ namespace JiShe.CollectBus.Common.Extensions return decimalNumber; } - ///// - ///// 十六进制转二进制 - ///// - ///// - ///// - //public static string HexToBin(this string hexString) - //{ - // var binaryValue = Convert.ToString(Convert.ToInt32(hexString, 16), 2); - // return binaryValue; - //} - - /// /// 十六进制转二进制 /// /// /// public static string HexToBin(this string hexString) + { + var binaryValue = Convert.ToString(Convert.ToInt32(hexString, 16), 2); + return binaryValue; + } + + + /// + /// 十六进制转二进制 + /// 不足4位,前面补0 + /// + /// + /// + public static string HexTo4BinZero(this string hexString) { string result = string.Empty; foreach (char c in hexString) diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index afe25da..f7bd68e 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,6 +16,7 @@ 后端服务 +