2025-04-25 09:31:16 +08:00

507 lines
21 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.SendData;
using JiShe.CollectBus.Protocol.SendData;
using Mapster;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Protocol
{
public class StandardProtocolPlugin : ProtocolPlugin
{
private readonly ILogger<StandardProtocolPlugin> _logger;
private readonly IProducerService _producerService;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly ITcpService _tcpService;
public readonly Dictionary<string, Telemetry3761PacketBuilder.T3761Delegate> T3761AFNHandlers;
public readonly Dictionary<string, Telemetry645PacketBuilder.T645Delegate> T645ControlHandlers;
/// <summary>
/// Initializes a new instance of the <see cref="StandardProtocolPlugin"/> class.
/// </summary>
/// <param name="serviceProvider">The service provider.</param>
public StandardProtocolPlugin(IServiceProvider serviceProvider, ILogger<StandardProtocolPlugin> logger, ITcpService tcpService) : base(serviceProvider, logger)
{
_logger = logger;
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
_producerService = serviceProvider.GetRequiredService<IProducerService>();
_deviceRepository = serviceProvider.GetRequiredService<IRepository<Device, Guid>>();
_tcpService = tcpService;
T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers;
T645ControlHandlers = Telemetry645PacketBuilder.T645ControlHandlers;
}
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null)
{
TB3761? tB3761 = Analysis3761(messageReceived);
if (tB3761 != null)
{
if (tB3761.AFN_FC?.AFN == (int)AFN.)
{
if (tB3761.A == null || tB3761.A.Code.IsNullOrWhiteSpace() || tB3761.A.A3?.D1_D7 == null || tB3761.SEQ?.PSEQ == null)
{
_logger.LogError($"解析AFN.链路接口检测报文失败,报文:{messageReceived},TB3761:{tB3761.Serialize()}");
}
else
{
if (tB3761.DT?.Fn == (int)FN.)
{
// 登录回复
if (tB3761.SEQ.CON == (int)CON.)
await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
}
else if (tB3761.DT?.Fn == (int)FN.)
{
// 心跳回复
//心跳帧有两种情况:
//1. 集中器先有登录帧,再有心跳帧
//2. 集中器没有登录帧,只有心跳帧
await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
}
}
}
await OnTcpNormalReceived(client, tB3761);
}
return (tB3761 as T)!;
}
/// <summary>
/// 正常帧处理将不同的AFN进行分发
/// </summary>
/// <param name="tcpSessionClient"></param>
/// <param name="messageHexString"></param>
/// <param name="tB3761"></param>
/// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient, TB3761 tB3761)
{
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
if (tB3761.AFN_FC.BaseHexMessage == null || tB3761.DT.BaseHexMessage == null || tB3761.BaseHexMessage.HexMessageString==null)
{
_logger.LogError("376.1协议解析AFN失败");
return;
}
// 登录心跳已做了处理,故需要忽略登录和心跳帧
if (tB3761.DT.Fn == (int)FN. || tB3761.DT.Fn == (int)FN.)
return;
//TODO根据AFN进行分流推送到kafka
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761.AFN_FC.AFN.ToString().PadLeft(2, '0'));
MessageProtocolAnalysis<TB3761> messageReceivedAnalysis = new MessageProtocolAnalysis<TB3761>()
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = tB3761.BaseHexMessage.HexMessageString!,
DeviceNo = tB3761.A.Code!,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
Data = tB3761
};
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByReceived();
if (topics.Contains(topicName))
await _producerService.ProduceAsync(topicName, messageReceivedAnalysis);
else
{
_logger.LogError($"不支持的上报kafka主题{topicName}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis);
}
}
/// <summary>
/// 登录回复
/// </summary>
/// <param name="client"></param>
/// <param name="code"></param>
/// <param name="msa"></param>
/// <param name="pseq"></param>
/// <returns></returns>
public async Task LoginAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string oldClientId = $"{client.Id}";
await client.ResetIdAsync(code);
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedLoginEvent = new MessageReceivedLogin
{
ClientId = code,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageReceived,
DeviceNo = code,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime=DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
};
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
var reqParam = new ReqParameter2
{
AFN = AFN.,
FunCode = (int)CFromStationFunCode.,
PRM = PRM.,
A = code,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = pseq!.Value
},
MSA = msa!.Value,
Pn = 0,
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
var issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedLoginEvent.ClientId,
DeviceNo = messageReceivedLoginEvent.DeviceNo,
Message = bytes,
Type = IssuedEventType.Login,
MessageId = messageReceivedLoginEvent.MessageId
};
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
}
}
/// <summary>
/// 心跳帧解析
/// </summary>
/// <param name="client"></param>
/// <param name="code"></param>
/// <param name="msa"></param>
/// <param name="pseq"></param>
/// <returns></returns>
public async Task HeartbeatAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string clientId = code;
string oldClientId = $"{client.Id}";
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code);
if (entity == null) //没有登录帧的设备,只有心跳帧
{
await client.ResetIdAsync(clientId);
await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
if (clientId != oldClientId)
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
}
else
{
entity.UpdateByLoginAndHeartbeat();
}
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
{
ClientId = clientId,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageReceived,
DeviceNo = code,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
};
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
var reqParam = new ReqParameter2()
{
AFN = AFN.,
FunCode = (int)CFromStationFunCode.,
PRM = PRM.,
A = code,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = pseq!.Value,
},
MSA = msa!.Value,
Pn = 0,
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
IssuedEventMessage issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedHeartbeatEvent.ClientId,
DeviceNo = messageReceivedHeartbeatEvent.DeviceNo,
Message = bytes,
Type = IssuedEventType.Heartbeat,
MessageId = messageReceivedHeartbeatEvent.MessageId
};
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
}
}
/// <summary>
/// 组装报文
/// </summary>
/// <param name="request">报文构建参数</param>
/// <returns></returns>
public override async Task<ProtocolBuildResponse> BuildAsync(ProtocolBuildRequest request)
{
if (request == null)
{
throw new Exception($"{nameof(StandardProtocolPlugin)} 报文构建失败,参数为空");
}
var itemCodeArr = request.ItemCode.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
Telemetry3761PacketResponse builderResponse = null;
List<string> dataUnit = new List<string>();
//数据转发场景 10H_F1_1CH
if (aFNStr == "10" && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
{
var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send";
Telemetry645PacketResponse t645PacketResponse = null;
if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName
, out var t645PacketHandler))
{
t645PacketResponse = t645PacketHandler(new Telemetry645PacketRequest()
{
MeterAddress = request.SubProtocolRequest.MeterAddress,
Password = request.SubProtocolRequest.Password,
ItemCode = request.SubProtocolRequest.ItemCode,
});
}
if (t645PacketResponse != null)
{
dataUnit = t645PacketResponse.Data;
}
}
string afnMethonCode = $"AFN{aFNStr}_Fn_Send";
if (T3761AFNHandlers != null && T3761AFNHandlers.TryGetValue(afnMethonCode
, out var handler))
{
builderResponse = handler(new Telemetry3761PacketRequest()
{
FocusAddress = request.FocusAddress,
Fn = fn,
Pn = request.Pn,
DataUnit = dataUnit,
});
}
if (builderResponse == null)
{
return new ProtocolBuildResponse();
}
var result = builderResponse.Adapt<ProtocolBuildResponse>();
result.IsSuccess = true;
return await Task.FromResult(result);
}
#region
//68
//32 00
//32 00
//68
//C9 1100'1001. 控制域C。
// D7=1, (终端发送)上行方向。
// D6=1, 此帧来自启动站。
// D5=0, (上行方向)要求访问位。表示终端无事件数据等待访问。
// D4=0, 保留
// D3~D0=9, 功能码。链路测试
//20 32 行政区划码
//90 26 终端地址
//00 主站地址和组地址标志。终端为单地址。 //3220 09 87 2
// 终端启动的发送帧的 MSA 应为 0, 其主站响应帧的 MSA 也应为 0.
//02 应用层功能码。AFN=2, 链路接口检测
//70 0111'0000. 帧序列域。无时间标签、单帧、需要确认。
//00 00 信息点。DA1和DA2全为“0”时表示终端信息点。
//01 00 信息类。F1, 登录。
//44 帧尾,包含用户区数据校验和
//16 帧结束标志
/// <summary>
/// 解析上行命令
/// </summary>
/// <param name="cmd"></param>
/// <returns></returns>
public CommandReulst? AnalysisCmd(string cmd)
{
CommandReulst? commandReulst = null;
var hexStringList = cmd.StringToPairs();
if (hexStringList.Count < hearderLen)
{
return commandReulst;
}
//验证起始字符
if (!hexStringList[0].IsStartStr() || !hexStringList[5].IsStartStr())
{
return commandReulst;
}
var lenHexStr = $"{hexStringList[2]}{hexStringList[1]}";
var lenBin = lenHexStr.HexToBin();
var len = lenBin.Remove(lenBin.Length - 2).BinToDec();
//验证长度
if (hexStringList.Count - 2 != hearderLen + len)
return commandReulst;
var userDataIndex = hearderLen;
var c = hexStringList[userDataIndex];//控制域 1字节
userDataIndex += 1;
var aHexList = hexStringList.Skip(userDataIndex).Take(5).ToList();//地址域 5字节
var a = AnalysisA(aHexList);
var a3Bin = aHexList[4].HexToBin().PadLeft(8, '0');
var mSA = a3Bin.Substring(0, 7).BinToDec();
userDataIndex += 5;
var aFN = (AFN)hexStringList[userDataIndex].HexToDec();//1字节
userDataIndex += 1;
var seq = hexStringList[userDataIndex].HexToBin().PadLeft(8, '0');
var tpV = (TpV)Convert.ToInt32(seq.Substring(0, 1));
var fIRFIN = (FIRFIN)Convert.ToInt32(seq.Substring(1, 2));
var cON = (CON)Convert.ToInt32(seq.Substring(3, 1));
var prseqBin = seq.Substring(4, 4);
userDataIndex += 1;
// (DA2 - 1) * 8 + DA1 = pn
var da1Bin = hexStringList[userDataIndex].HexToBin();
var da1 = da1Bin == "0" ? 0 : da1Bin.Length;
userDataIndex += 1;
var da2 = hexStringList[userDataIndex].HexToDec();
var pn = da2 == 0 ? 0 : (da2 - 1) * 8 + da1;
userDataIndex += 1;
//(DT2*8)+DT1=fn
var dt1Bin = hexStringList[userDataIndex].HexToBin();
var dt1 = dt1Bin != "0" ? dt1Bin.Length : 0;
userDataIndex += 1;
var dt2 = hexStringList[userDataIndex].HexToDec();
var fn = dt2 * 8 + dt1;
userDataIndex += 1;
//数据单元
var datas = hexStringList.Skip(userDataIndex).Take(len + hearderLen - userDataIndex).ToList();
//EC
//Tp
commandReulst = new CommandReulst()
{
A = a,
MSA = mSA,
AFN = aFN,
Seq = new Seq()
{
TpV = tpV,
FIRFIN = fIRFIN,
CON = cON,
PRSEQ = prseqBin.BinToDec(),
},
CmdLength = len,
Pn = pn,
Fn = fn,
HexDatas = datas
};
return commandReulst;
}
/// <summary>
/// 解析地址
/// </summary>
/// <param name="aHexList"></param>
/// <returns></returns>
private string AnalysisA(List<string> aHexList)
{
var a1 = aHexList[1] + aHexList[0];
var a2 = aHexList[3] + aHexList[2];
var a2Dec = a2.HexToDec();
var a3 = aHexList[4];
var a = $"{a1}{a2Dec.ToString().PadLeft(5, '0')}";
return a;
}
#endregion
}
}