优化tcp接受消息处理

This commit is contained in:
zenghongyao 2025-04-22 09:34:59 +08:00
parent 83efc6da00
commit 503931f297
7 changed files with 75 additions and 239 deletions

View File

@ -101,7 +101,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
if (baseHexMessage.HexMessageList.Count == 0) if (baseHexMessage.HexMessageList.Count == 0)
return null; return null;
string binStr = baseHexMessage.HexMessageString.HexToBin(); string binStr = baseHexMessage.HexMessageString.HexTo4BinZero();
C c = new C C c = new C
{ {
BaseHexMessage = baseHexMessage, BaseHexMessage = baseHexMessage,
@ -176,7 +176,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
if (baseHexMessage.HexMessageList.Count == 0) if (baseHexMessage.HexMessageList.Count == 0)
return null; return null;
var binStr = baseHexMessage.HexMessageString.HexToBin(); var binStr = baseHexMessage.HexMessageString.HexTo4BinZero();
A3 a3 = new A3 A3 a3 = new A3
{ {
BaseHexMessage = baseHexMessage, BaseHexMessage = baseHexMessage,
@ -238,7 +238,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList);
if (baseHexMessage.HexMessageList.Count == 0) if (baseHexMessage.HexMessageList.Count == 0)
return null; return null;
var binStr = baseHexMessage.HexMessageString.HexToBin(); var binStr = baseHexMessage.HexMessageString.HexTo4BinZero();
SEQ seq = new SEQ SEQ seq = new SEQ
{ {
PSEQ = binStr.Substring(binStr.Length - 4, 4).BinToDec(), PSEQ = binStr.Substring(binStr.Length - 4, 4).BinToDec(),
@ -354,7 +354,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
/// <param name="da1"></param> /// <param name="da1"></param>
/// <param name="da2"></param> /// <param name="da2"></param>
/// <returns></returns> /// <returns></returns>
public int CalculatePn(string da1, string da2) => (da2.HexToDec() - 1) * 8 + (8 - da1.HexToBin().IndexOf(da1.Equals("00") ? "0" : "1")); public int CalculatePn(string da1, string da2) => (da2.HexToDec() - 1) * 8 + (8 - da1.HexTo4BinZero().IndexOf(da1.Equals("00") ? "0" : "1"));
/// <summary> /// <summary>
/// 计算Fn /// 计算Fn
@ -362,7 +362,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
/// <param name="dt1"></param> /// <param name="dt1"></param>
/// <param name="dt2"></param> /// <param name="dt2"></param>
/// <returns></returns> /// <returns></returns>
public int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexToBin().IndexOf("1")); public int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexTo4BinZero().IndexOf("1"));
} }
} }

View File

@ -55,18 +55,19 @@ namespace JiShe.CollectBus.Protocol
} }
else else
{ {
if (tB3761.SEQ.CON == 1) if (tB3761.DT?.Fn == (int)FN.)
{ {
if (tB3761.DT?.Fn == 1) // 登录回复
{ if (tB3761.SEQ.CON == (int)CON.)
// 登录回复
await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ); await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
} }
else if (tB3761.DT?.Fn == 2) else if (tB3761.DT?.Fn == (int)FN.)
{ {
// 心跳回复 // 心跳回复
await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ); //心跳帧有两种情况:
} //1. 集中器先有登录帧,再有心跳帧
//2. 集中器没有登录帧,只有心跳帧
await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
} }
} }
@ -89,9 +90,7 @@ namespace JiShe.CollectBus.Protocol
public async Task LoginAsync(ITcpSessionClient client,string messageReceived, string code, int? msa, int? pseq) public async Task LoginAsync(ITcpSessionClient client,string messageReceived, string code, int? msa, int? pseq)
{ {
string oldClientId = $"{client.Id}"; string oldClientId = $"{client.Id}";
await client.ResetIdAsync(code); await client.ResetIdAsync(code);
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code); var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1) if (deviceInfoList != null && deviceInfoList.Count > 1)
{ {
@ -128,7 +127,8 @@ namespace JiShe.CollectBus.Protocol
//await _producerBus.Publish( messageReceivedLoginEvent); //await _producerBus.Publish( messageReceivedLoginEvent);
//var aTuple = (Tuple<string, int>)messageReceived.StringToPairs().GetAnalyzeValue(CommandChunkEnum.A);
//var seq = (Seq)messageReceived.StringToPairs().GetAnalyzeValue(CommandChunkEnum.SEQ);
var reqParam = new ReqParameter2 var reqParam = new ReqParameter2
{ {
AFN = AFN., AFN = AFN.,
@ -166,7 +166,6 @@ namespace JiShe.CollectBus.Protocol
string clientId = code; string clientId = code;
string oldClientId = $"{client.Id}"; string oldClientId = $"{client.Id}";
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code); var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1) if (deviceInfoList != null && deviceInfoList.Count > 1)
{ {

View File

@ -71,49 +71,16 @@ namespace JiShe.CollectBus.Plugins
_logger.LogError("协议不存在!"); _logger.LogError("协议不存在!");
} }
protocolPlugin.Analysis3761(messageHexString);
TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync<TB3761>(tcpSessionClient, messageHexString); TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync<TB3761>(tcpSessionClient, messageHexString);
if (tB3761 == null)
{
_logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
//var hexStringList = messageHexString.StringToPairs(); }
//var aFn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); else
//var fn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); {
//var aTuple = (Tuple<string, int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); await OnTcpNormalReceived(tcpSessionClient, messageHexString, tB3761);
//if (aFn.HasValue && fn.HasValue && aTuple != null && !string.IsNullOrWhiteSpace(aTuple.Item1)) }
//{ await e.InvokeNext();
//var tcpSessionClient = (ITcpSessionClient)client;
//if ((AFN)aFn == AFN.链路接口检测)
//{
// switch (fn)
// {
// case 1:
// await OnTcpLoginReceived(tcpSessionClient, messageHexString, aTuple.Item1);
// break;
// case 3:
// //心跳帧有两种情况:
// //1. 集中器先有登录帧,再有心跳帧
// //2. 集中器没有登录帧,只有心跳帧
// await OnTcpHeartbeatReceived(tcpSessionClient, messageHexString, aTuple.Item1);
// break;
// default:
// _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
// break;
// }
//}
//else
//{
//await OnTcpNormalReceived(tcpSessionClient, messageHexString); // , aTuple.Item1, aFn.ToString()!.PadLeft(2, '0')
//}
//}
//else
//{
// _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
//}
await e.InvokeNext();
} }
//[GeneratorPlugin(typeof(ITcpConnectingPlugin))] //[GeneratorPlugin(typeof(ITcpConnectingPlugin))]
@ -154,122 +121,20 @@ namespace JiShe.CollectBus.Plugins
await e.InvokeNext(); await e.InvokeNext();
} }
/// <summary>
/// 登录帧处理
/// </summary>
/// <param name="client"></param>
/// <param name="messageHexString"></param>
/// <param name="deviceNo">集中器编号</param>
/// <returns></returns>
private async Task OnTcpLoginReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
string oldClientId = $"{client.Id}";
await client.ResetIdAsync(deviceNo);
var deviceInfoList= await _deviceRepository.GetListAsync(a => a.Number == deviceNo);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedLoginEvent = new MessageReceivedLogin
{
ClientId = deviceNo,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageHexString,
DeviceNo = deviceNo,
MessageId = Guid.NewGuid().ToString()
};
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
//await _producerBus.Publish( messageReceivedLoginEvent);
}
private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
string clientId = deviceNo;
string oldClientId = $"{client.Id}";
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == deviceNo);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo);
if (entity == null) //没有登录帧的设备,只有心跳帧
{
await client.ResetIdAsync(clientId);
await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
if (clientId != oldClientId)
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
}
else
{
entity.UpdateByLoginAndHeartbeat();
}
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
{
ClientId = clientId,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageHexString,
DeviceNo = deviceNo,
MessageId = Guid.NewGuid().ToString()
};
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
}
/// <summary> /// <summary>
/// 正常帧处理将不同的AFN进行分发 /// 正常帧处理将不同的AFN进行分发
/// </summary> /// </summary>
/// <param name="client"></param> /// <param name="tcpSessionClient"></param>
/// <param name="messageHexString"></param> /// <param name="messageHexString"></param>
/// <param name="deviceNo"></param> /// <param name="tB3761"></param>
/// <param name="aFn"></param>
/// <returns></returns> /// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient, string messageHexString) //, string deviceNo,string aFn private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761)
{ {
var _protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var _protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (_protocolPlugin == null) if (_protocolPlugin == null)
{ {
_logger.LogError("协议不存在!"); _logger.LogError("376.1协议插件不存在!");
} }
TB3761? tB3761 = await _protocolPlugin!.AnalyzeAsync<TB3761>(tcpSessionClient, messageHexString);
//await _producerBus.Publish(new MessageReceived //await _producerBus.Publish(new MessageReceived
@ -296,17 +161,15 @@ namespace JiShe.CollectBus.Plugins
//}); //});
//TODO根据AFN进行分流推送到kafka //TODO根据AFN进行分流推送到kafka
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{
//await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived ClientId = tcpSessionClient.Id,
//{ ClientIp = tcpSessionClient.IP,
// ClientId = client.Id, ClientPort = tcpSessionClient.Port,
// ClientIp = client.IP, MessageHexString = messageHexString,
// ClientPort = client.Port, DeviceNo = tB3761?.A?.Code!,
// MessageHexString = messageHexString, MessageId = Guid.NewGuid().ToString()
// DeviceNo = deviceNo, });
// MessageId = Guid.NewGuid().ToString()
//});
} }
} }
} }

View File

@ -70,62 +70,29 @@ namespace JiShe.CollectBus.Subscribers
//[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages) public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{ {
bool isAck = false;
foreach (var issuedEventMessage in issuedEventMessages) foreach (var issuedEventMessage in issuedEventMessages)
{ {
switch (issuedEventMessage.Type) _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
{ var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
case IssuedEventType.Heartbeat: loginEntity.AckTime = Clock.Now;
break; loginEntity.IsAck = true;
case IssuedEventType.Login: await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true;
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
isAck = true;
break;
case IssuedEventType.Data:
break;
default:
throw new ArgumentOutOfRangeException();
}
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
//if (device != null)
//{
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
} }
return SubscribeAck.Success();
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
} }
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages) public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{ {
bool isAck = false;
foreach (var issuedEventMessage in issuedEventMessages) foreach (var issuedEventMessage in issuedEventMessages)
{ {
switch (issuedEventMessage.Type) _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}");
{ var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
case IssuedEventType.Heartbeat: heartbeatEntity.AckTime = Clock.Now;
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); heartbeatEntity.IsAck = true;
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
isAck = true;
break;
case IssuedEventType.Data:
break;
default:
throw new ArgumentOutOfRangeException();
}
//var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo);
//if (device != null) //if (device != null)
//{ //{
@ -135,7 +102,7 @@ namespace JiShe.CollectBus.Subscribers
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
} }
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); return SubscribeAck.Success();
} }
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
@ -152,7 +119,7 @@ namespace JiShe.CollectBus.Subscribers
else else
{ {
//todo 会根据不同的协议进行解析,然后做业务处理 //todo 会根据不同的协议进行解析,然后做业务处理
TB3761 tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString);
if (tB3761 == null) if (tB3761 == null)
{ {
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");

View File

@ -368,4 +368,9 @@ namespace JiShe.CollectBus.Common.Enums
HardwareReleaseDate=38 HardwareReleaseDate=38
} }
public enum FN
{
= 1,
= 3
}
} }

View File

@ -1178,24 +1178,25 @@ namespace JiShe.CollectBus.Common.Extensions
return decimalNumber; return decimalNumber;
} }
///// <summary>
///// 十六进制转二进制
///// </summary>
///// <param name="hexString"></param>
///// <returns></returns>
//public static string HexToBin(this string hexString)
//{
// var binaryValue = Convert.ToString(Convert.ToInt32(hexString, 16), 2);
// return binaryValue;
//}
/// <summary> /// <summary>
/// 十六进制转二进制 /// 十六进制转二进制
/// </summary> /// </summary>
/// <param name="hexString"></param> /// <param name="hexString"></param>
/// <returns></returns> /// <returns></returns>
public static string HexToBin(this string hexString) public static string HexToBin(this string hexString)
{
var binaryValue = Convert.ToString(Convert.ToInt32(hexString, 16), 2);
return binaryValue;
}
/// <summary>
/// 十六进制转二进制
/// 不足4位前面补0
/// </summary>
/// <param name="hexString"></param>
/// <returns></returns>
public static string HexTo4BinZero(this string hexString)
{ {
string result = string.Empty; string result = string.Empty;
foreach (char c in hexString) foreach (char c in hexString)

View File

@ -16,6 +16,7 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0"/> <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/> <link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
<title>后端服务</title> <title>后端服务</title>
</head> </head>
<body> <body>