diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin_bak.cs similarity index 88% rename from protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs rename to protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin_bak.cs index e49d1be..92c680b 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin_bak.cs @@ -15,10 +15,10 @@ using JiShe.CollectBus.Common.Consts; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { - public abstract class BaseProtocolPlugin : IProtocolPlugin + public abstract class BaseProtocolPlugin_bak //: IProtocolPlugin { private readonly IProducerService _producerService; - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly IRepository _protocolInfoRepository; //头部字节长度 @@ -29,13 +29,13 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts public const string errorData = "EE"; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// The service provider. - protected BaseProtocolPlugin(IServiceProvider serviceProvider) + protected BaseProtocolPlugin_bak(IServiceProvider serviceProvider) { - _logger = serviceProvider.GetRequiredService>(); + _logger = serviceProvider.GetRequiredService>(); _protocolInfoRepository = serviceProvider.GetRequiredService>(); _producerService = serviceProvider.GetRequiredService(); } @@ -603,149 +603,149 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts var time = Appendix.Appendix_A1(hexDatas.Take(6).ToList()); } - /// - /// 通用解析 - /// - /// - /// - /// - public virtual TB3761 AnalyzeReadingDataAsync(MessageReceived messageReceived, - Action? sendAction = null) - { - var hexStringList = messageReceived.MessageHexString.StringToPairs(); - var afn = (AFN)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); - var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); + ///// + ///// 通用解析 + ///// + ///// + ///// + ///// + //public virtual TB3761 AnalyzeReadingDataAsync(MessageReceived messageReceived, + // Action? sendAction = null) + //{ + // var hexStringList = messageReceived.MessageHexString.StringToPairs(); + // var afn = (AFN)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); + // var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); - var tb3761 = QGDW3761Config.CommandList.FirstOrDefault(it => it.Afn == afn); - if (tb3761 == null) return null; + // var tb3761 = QGDW3761Config.CommandList.FirstOrDefault(it => it.Afn == afn); + // if (tb3761 == null) return null; - var tb3761Fn = tb3761.FnList.FirstOrDefault(it => it.Fn == fn); - if (tb3761Fn == null) return null; + // var tb3761Fn = tb3761.FnList.FirstOrDefault(it => it.Fn == fn); + // if (tb3761Fn == null) return null; - var analyzeValue = (List)hexStringList.GetAnalyzeValue(CommandChunkEnum.Data); + // var analyzeValue = (List)hexStringList.GetAnalyzeValue(CommandChunkEnum.Data); - var m = 0; - var rateNumberUpSort = -1; - var rateNumberUp = tb3761Fn.UpList.FirstOrDefault(it => it.Name.Contains("费率数M")); - if (rateNumberUp != null) - { - var rateNumber = analyzeValue.Skip(rateNumberUp.DataIndex).Take(rateNumberUp.DataCount).FirstOrDefault(); - m = Convert.ToInt32(rateNumber); - rateNumberUpSort = rateNumberUp.Sort; - } + // var m = 0; + // var rateNumberUpSort = -1; + // var rateNumberUp = tb3761Fn.UpList.FirstOrDefault(it => it.Name.Contains("费率数M")); + // if (rateNumberUp != null) + // { + // var rateNumber = analyzeValue.Skip(rateNumberUp.DataIndex).Take(rateNumberUp.DataCount).FirstOrDefault(); + // m = Convert.ToInt32(rateNumber); + // rateNumberUpSort = rateNumberUp.Sort; + // } - foreach (var up in tb3761Fn.UpList) - { - var dataIndex = up.DataIndex; - if (dataIndex == 0 && up.Sort > rateNumberUpSort) - { - var sum1 = tb3761Fn.UpList.Where(it => it.Sort < up.Sort) - .Sum(it => it.DataCount); - var sum2 = tb3761Fn.UpList.Where(it => it.Sort < up.Sort && it.Tb3761UpChildlList.Count > 0) - .Sum(it => it.Tb3761UpChildlList.Sum(c=> m * c.DataCount)); - dataIndex = sum1 + sum2; - } + // foreach (var up in tb3761Fn.UpList) + // { + // var dataIndex = up.DataIndex; + // if (dataIndex == 0 && up.Sort > rateNumberUpSort) + // { + // var sum1 = tb3761Fn.UpList.Where(it => it.Sort < up.Sort) + // .Sum(it => it.DataCount); + // var sum2 = tb3761Fn.UpList.Where(it => it.Sort < up.Sort && it.Tb3761UpChildlList.Count > 0) + // .Sum(it => it.Tb3761UpChildlList.Sum(c=> m * c.DataCount)); + // dataIndex = sum1 + sum2; + // } - var value = AnalyzeDataAccordingDataType(analyzeValue, dataIndex, up.DataCount, up.DataType); - if (value != null) - { - up.Value = value.ToString(); - } - if (up.Tb3761UpChildlList.Count > 0) //复费率根据费率数来解析 - { - var repeatCount = m; - foreach (var upChild in up.Tb3761UpChildlList) - { - for (var j = 0; j < repeatCount; j++) - { - var val = AnalyzeDataAccordingDataType(analyzeValue, dataIndex, upChild.DataCount, upChild.DataType); - if (val != null) - { - upChild.Name = string.Format(upChild.Name, j + 1); - upChild.Value = val.ToString(); - } - dataIndex += upChild.DataCount; - } - } + // var value = AnalyzeDataAccordingDataType(analyzeValue, dataIndex, up.DataCount, up.DataType); + // if (value != null) + // { + // up.Value = value.ToString(); + // } + // if (up.Tb3761UpChildlList.Count > 0) //复费率根据费率数来解析 + // { + // var repeatCount = m; + // foreach (var upChild in up.Tb3761UpChildlList) + // { + // for (var j = 0; j < repeatCount; j++) + // { + // var val = AnalyzeDataAccordingDataType(analyzeValue, dataIndex, upChild.DataCount, upChild.DataType); + // if (val != null) + // { + // upChild.Name = string.Format(upChild.Name, j + 1); + // upChild.Value = val.ToString(); + // } + // dataIndex += upChild.DataCount; + // } + // } - } - } + // } + // } - return tb3761; - } + // return tb3761; + //} - /// - /// 通用解析 日冻结曲线类 - /// - /// - /// - /// - public virtual TB3761 AnalyzeReadingTdcDataAsync(MessageReceived messageReceived, - Action? sendAction = null) - { + ///// + ///// 通用解析 日冻结曲线类 + ///// + ///// + ///// + ///// + //public virtual TB3761 AnalyzeReadingTdcDataAsync(MessageReceived messageReceived, + // Action? sendAction = null) + //{ - var hexStringList = messageReceived.MessageHexString.StringToPairs(); - var afn = (AFN)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); - var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); + // var hexStringList = messageReceived.MessageHexString.StringToPairs(); + // var afn = (AFN)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); + // var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); - var tb3761 = QGDW3761Config.CommandTdcList.FirstOrDefault(it => it.Afn == afn); - if (tb3761 == null) return null; + // var tb3761 = QGDW3761Config.CommandTdcList.FirstOrDefault(it => it.Afn == afn); + // if (tb3761 == null) return null; - var tb3761Fn = tb3761.FnList.FirstOrDefault(it => it.Fn == fn); - if (tb3761Fn == null) return null; + // var tb3761Fn = tb3761.FnList.FirstOrDefault(it => it.Fn == fn); + // if (tb3761Fn == null) return null; - var analyzeValue = (List)hexStringList.GetAnalyzeValue(CommandChunkEnum.Data); + // var analyzeValue = (List)hexStringList.GetAnalyzeValue(CommandChunkEnum.Data); - foreach (var up in tb3761Fn.UpList) - { - var value = AnalyzeDataAccordingDataType(analyzeValue, up.DataIndex, up.DataCount, up.DataType); - if (value != null) - { - up.Value = value.ToString(); + // foreach (var up in tb3761Fn.UpList) + // { + // var value = AnalyzeDataAccordingDataType(analyzeValue, up.DataIndex, up.DataCount, up.DataType); + // if (value != null) + // { + // up.Value = value.ToString(); - if (up.Tb3761UpChildlList.Count > 0) - { - var dataIndex = up.DataIndex; - var repeatCount = (int)value; - foreach (var upChild in up.Tb3761UpChildlList) - { - for (var j = 0; j < repeatCount; j++) - { - var val = AnalyzeDataAccordingDataType(analyzeValue, dataIndex, upChild.DataCount, upChild.DataType); - if (val != null) - { - upChild.Value = val.ToString(); - upChild.Name = string.Format(upChild.Name, j + 1); - } - dataIndex += upChild.DataCount; - } - } - } - } - } + // if (up.Tb3761UpChildlList.Count > 0) + // { + // var dataIndex = up.DataIndex; + // var repeatCount = (int)value; + // foreach (var upChild in up.Tb3761UpChildlList) + // { + // for (var j = 0; j < repeatCount; j++) + // { + // var val = AnalyzeDataAccordingDataType(analyzeValue, dataIndex, upChild.DataCount, upChild.DataType); + // if (val != null) + // { + // upChild.Value = val.ToString(); + // upChild.Name = string.Format(upChild.Name, j + 1); + // } + // dataIndex += upChild.DataCount; + // } + // } + // } + // } + // } - return tb3761; - //var freezeDensity = (FreezeDensity)Convert.ToInt32(hexDatas.Skip(5).Take(1)); - //var addMinute = 0; - //switch (freezeDensity) - //{ - // case FreezeDensity.No:break; - // case FreezeDensity.Min15: - // addMinute = 15; - // break; - // case FreezeDensity.Min30: - // addMinute = 30; - // break; - // case FreezeDensity.Min60: - // addMinute = 60; - // break; - // case FreezeDensity.Min5: break; - // addMinute = 5; - // case FreezeDensity.Min1: - // addMinute = 1; - // break; - // } - } + // return tb3761; + // //var freezeDensity = (FreezeDensity)Convert.ToInt32(hexDatas.Skip(5).Take(1)); + // //var addMinute = 0; + // //switch (freezeDensity) + // //{ + // // case FreezeDensity.No:break; + // // case FreezeDensity.Min15: + // // addMinute = 15; + // // break; + // // case FreezeDensity.Min30: + // // addMinute = 30; + // // break; + // // case FreezeDensity.Min60: + // // addMinute = 60; + // // break; + // // case FreezeDensity.Min5: break; + // // addMinute = 5; + // // case FreezeDensity.Min1: + // // addMinute = 1; + // // break; + // // } + //} private object? AnalyzeDataAccordingDataType(List analyzeValue, int dataIndex,int dataCount,string dataType) { diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs new file mode 100644 index 0000000..51336bf --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs @@ -0,0 +1,368 @@ +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.IotSystems.Protocols; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Models; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using TouchSocket.Sockets; +using Volo.Abp.Domain.Repositories; + +namespace JiShe.CollectBus.Protocol.Contracts.Abstracts +{ + public abstract class ProtocolPlugin:IProtocolPlugin + { + //头部字节长度 + public const int hearderLen = 6; + + public const int tPLen = 6; + + public const string errorData = "EE"; + + private readonly ILogger _logger; + private readonly IRepository _protocolInfoRepository; + public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger) + { + _logger = logger; + _protocolInfoRepository = serviceProvider.GetRequiredService>(); + + } + + + public abstract ProtocolInfo Info { get; } + + public virtual async Task GetAsync() => await Task.FromResult(Info); + + public virtual async Task AddAsync() + { + if (Info == null) + { + throw new ArgumentNullException(nameof(Info)); + } + + await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name); + await _protocolInfoRepository.InsertAsync(Info); + //await _protocolInfoCache.Get() + } + + public abstract Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? receivedAction = null) where T :class; + + + /// + /// 解析376.1帧 + /// + /// + /// + public virtual TB3761? Analysis3761(string messageReceived) + { + try + { + var hexStringList = messageReceived.StringToPairs(); + // 初步校验 + if (hexStringList.Count < 6 || hexStringList.FirstOrDefault() != "68" || hexStringList.Skip(5).Take(1).FirstOrDefault() != "68" || hexStringList.Count < 18 || hexStringList.LastOrDefault() != "16") + { + _logger.LogError($"解析Analysis3761校验不通过,报文:{messageReceived}"); + } + else + { + TB3761 tB3761 = new TB3761 + { + C = Analysis_C(hexStringList), + A = Analysis_A(hexStringList), + AFN_FC = Analysis_AFN_FC(hexStringList), + SEQ = Analysis_SEQ(hexStringList), + UnitData = Analysis_UnitData(hexStringList), + DA = Analysis_DA(hexStringList), + DT = Analysis_DT(hexStringList) + }; + return tB3761; + } + } + catch (Exception ex) + { + _logger.LogError($"解析Analysis3761错误,报文:{messageReceived},异常:{ex.Message}"); + } + return null; + } + + /// + /// 控制域C解析 + /// + /// + public virtual C? Analysis_C(List hexStringList) + { + try + { + if (hexStringList.Count > 6) + { + BaseHexMessage baseHexMessage = new BaseHexMessage + { + HexMessageList = hexStringList.GetRange(6, 1) // 控制域 1字节 + }; + baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); + if (baseHexMessage.HexMessageList.Count == 0) + return null; + string binStr = baseHexMessage.HexMessageString.HexToBin(); + C c = new C + { + BaseHexMessage = baseHexMessage, + FC = binStr.Substring(binStr.Length - 4, 4).BinToDec(), + FCV = binStr.Substring(3, 1).BinToDec(), + FCB = binStr.Substring(2, 1).BinToDec(), + PRM = binStr.Substring(1, 1).BinToDec(), + DIR = binStr.Substring(0, 1).BinToDec() + }; + return c; + } + } + catch (Exception ex) + { + _logger.LogError($"解析Analysis_C错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}"); + } + + return null; + } + + /// + /// 地址域A解析 + /// + /// + /// + public virtual A? Analysis_A(List hexStringList) + { + try + { + if (hexStringList.Count > 7) + { + BaseHexMessage baseHexMessage = new BaseHexMessage + { + HexMessageList = hexStringList.GetRange(7, 5) // 地址域 5个字节 + }; + baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); + if (baseHexMessage.HexMessageList.Count == 0) + return null; + A a = new A + { + BaseHexMessage = baseHexMessage, + A1 = baseHexMessage.HexMessageList.ListReverseToStr(0, 2),//.DataConvert(10);//行政区划码A1 + A2 = baseHexMessage.HexMessageList.ListReverseToStr(2, 2).PadLeft(5, '0').HexToDec(),//终端地址A2 + A3 = Analysis_A3(baseHexMessage.HexMessageList) //主站地址和组地址标志A3 + }; + a.Code = $"{a.A1.PadLeft(4, '0')}{a.A2.ToString().PadLeft(5, '0')}"; + return a; + } + } + catch (Exception ex) + { + _logger.LogError($"解析Analysis_A错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}"); + } + + return null; + + } + + /// + /// 站地址和组地址标志A3 + /// + /// 地址域A集合 + /// + public virtual A3? Analysis_A3(List hexAList) + { + try + { + BaseHexMessage baseHexMessage = new BaseHexMessage + { + HexMessageList = hexAList.GetRange(4, 1) // 站地址和组地址标志A3 1个字节 + }; + baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); + if (baseHexMessage.HexMessageList.Count == 0) + return null; + var binStr = baseHexMessage.HexMessageString.HexToBin(); + A3 a3 = new A3 + { + BaseHexMessage = baseHexMessage, + D0 = binStr.Substring(binStr.Length - 1, 1).BinToDec(), + D1_D7 = binStr.Substring(0, binStr.Length - 1).BinToDec() + }; + return a3; + } + catch (Exception ex) + { + _logger.LogError($"解析Analysis_A3错误,报文:{string.Join("", hexAList)},异常:{ex.Message}"); + } + + return null; + } + + /// + /// AFN_FC功能码 + /// + /// + public virtual AFN_FC? Analysis_AFN_FC(List hexStringList) + { + try + { + BaseHexMessage baseHexMessage = new BaseHexMessage + { + HexMessageList = hexStringList.GetRange(12, 1) //AFN功能码 1个字节 + }; + baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); + if (baseHexMessage.HexMessageList.Count == 0) + return null; + AFN_FC aFN_FC = new AFN_FC + { + BaseHexMessage = baseHexMessage, + AFN = baseHexMessage.HexMessageString.HexToDec(), + }; + return aFN_FC; + } + catch (Exception ex) + { + _logger.LogError($"解析Analysis_AFN_FC错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}"); + } + + return null; + } + + /// + /// 解析帧序列域SEQ + /// + /// + public virtual SEQ? Analysis_SEQ(List hexStringList) + { + try + { + BaseHexMessage baseHexMessage = new BaseHexMessage + { + HexMessageList = hexStringList.GetRange(13, 1) //帧序列域 SEQ 1个字节 + }; + baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); + if (baseHexMessage.HexMessageList.Count == 0) + return null; + var binStr = baseHexMessage.HexMessageString.HexToBin(); + SEQ seq = new SEQ + { + PSEQ = binStr.Substring(binStr.Length - 4, 4).BinToDec(), + CON = binStr.Substring(3, 1).BinToDec(), + FIN = binStr.Substring(2, 1).BinToDec(), + FIR = binStr.Substring(1, 1).BinToDec(), + TpV = binStr.Substring(0, 1).BinToDec() + }; + return seq; + } + catch (Exception ex) + { + _logger.LogError($"解析Analysis_SEQ错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}"); + } + return null; + } + + + /// + /// 数据单元标识及数据单元数据 + /// + public virtual UnitData? Analysis_UnitData(List hexStringList) + { + try + { + + UnitData unitData = new UnitData + { + HexMessageList = hexStringList.GetRange(14, hexStringList.Count - 14 - 2) //总数字节数-固定长度报文头-控制域C-地址域A-校验和CS-结束字符(16H) + }; + unitData.HexMessageString = string.Join("", unitData.HexMessageList); + if (unitData.HexMessageList.Count == 0) + return null; + return unitData; + } + catch (Exception ex) + { + _logger.LogError($"解析Analysis_UnitData错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}"); + } + return null; + } + + /// + /// 信息点DA Pn + /// + /// + public virtual DA? Analysis_DA(List hexStringList) + { + try + { + BaseHexMessage baseHexMessage = new BaseHexMessage + { + HexMessageList = hexStringList.GetRange(14, 2) //信息点DA Pn 2个字节 + }; + baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); + if (baseHexMessage.HexMessageList.Count == 0) + return null; + var da1 = baseHexMessage.HexMessageList[0]; + var da2 = baseHexMessage.HexMessageList[1]; + DA da = new DA() + { + BaseHexMessage = baseHexMessage, + Pn = CalculatePn(da1, da2) + }; + return da; + + } + catch (Exception ex) + { + _logger.LogError($"解析Analysis_DA错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}"); + } + return null; + } + + /// + /// 信息类DT Fn + /// + /// + public virtual DT? Analysis_DT(List hexStringList) + { + try + { + BaseHexMessage baseHexMessage = new BaseHexMessage + { + HexMessageList = hexStringList.GetRange(16, 2) //信息类DT Fn 2个字节 + }; + baseHexMessage.HexMessageString = string.Join("", baseHexMessage.HexMessageList); + if (baseHexMessage.HexMessageList.Count == 0) + return null; + var dt1 = baseHexMessage.HexMessageList[0]; + var dt2 = baseHexMessage.HexMessageList[1]; + DT dt = new DT() + { + BaseHexMessage = baseHexMessage, + Fn = CalculateFn(dt1, dt2) + }; + return dt; + + } + catch (Exception ex) + { + _logger.LogError($"解析Analysis_DT错误,报文:{string.Join("", hexStringList)},异常:{ex.Message}"); + } + + return null; + + } + + + /// + /// 计算Pn + /// + /// + /// + /// + public int CalculatePn(string da1, string da2) => (da2.HexToDec() - 1) * 8 + (8 - da1.HexToBin().IndexOf(da1.Equals("00") ? "0" : "1")); + + /// + /// 计算Fn + /// + /// + /// + /// + public int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexToBin().IndexOf("1")); + + } +} diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs index 2f48cd2..37e71e8 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs @@ -14,10 +14,12 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces Task AddAsync(); - Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761; + Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? sendAction = null) where T : class; - Task LoginAsync(MessageReceivedLogin messageReceived); + TB3761? Analysis3761(string messageReceived); - Task HeartbeatAsync(MessageReceivedHeartbeat messageReceived); + //Task LoginAsync(MessageReceivedLogin messageReceived); + + //Task HeartbeatAsync(MessageReceivedHeartbeat messageReceived); } } diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/TB3761.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/TB3761.cs index ecc1f88..6ddda15 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/TB3761.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/TB3761.cs @@ -1,5 +1,4 @@ -using JiShe.CollectBus.Common.Enums; -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -7,49 +6,238 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Protocol.Contracts.Models { + + /// + /// 解析3761报文 + /// public class TB3761 { - public int Id { get; set; } + /// + /// 控制域C + /// + public C? C { get; set; } - public AFN Afn { get; set; } + /// + /// 地址域A + /// + public A? A { get; set; } - public List FnList { get; set; } + /// + /// 帧序列域 SEQ + /// + public SEQ? SEQ { get; set; } + + /// + /// 用户数据区 + /// 功能码 + /// + public AFN_FC? AFN_FC { get; set; } + + /// + /// 用户数据区 + /// 信息点DA Pn + /// + public DA? DA { get; set; } + + /// + /// 用户数据区 + /// 信息类DT Fn + /// + public DT? DT { get; set; } + + /// + /// 数据单元标识和数据单元格式 + /// + public UnitData? UnitData { get; set; } + } + + #region + + /// + /// 报文信息 + /// + public class BaseHexMessage + { + /// + /// 报文 + /// + public string? HexMessageString { get; set; } + + /// + /// 报文数组 + /// + public List? HexMessageList { get; set; } + } + + /// + /// 控制域C + /// + public class C + { + /// + /// 控制域C报文 + /// + public BaseHexMessage? BaseHexMessage { get; set; } + + /// + /// 传输方向位D7 DIR=0,表示此帧报文是由主站发出的下行报文;DIR=1,表示此帧报文是由终端发出的上行报文。 + /// + public int DIR { get; set; } + /// + /// D6启动标志位 0:表示此帧报文来自从动站(终端),1:表示此帧报文来自启动站(服务端) + /// + public int PRM { get; set; } + /// + /// D5下行:帧计数位(FCB)/上行(ACD):要求访问位(终端有重要事件等待访问), + /// + public int FCB { get; set; } + /// + /// 下行:帧计数有效位(决定FCB位有效/无效)/上行:保留 D4 + /// + public int FCV { get; set; } + /// + /// 功能码 D0-D3 + /// + public int FC { get; set; } } - public class TB3761FN - { - public int Id { get; set; } + /// + /// 地址域A + /// + public class A + { + /// + /// 地址域报文 + /// + public BaseHexMessage? BaseHexMessage { get; set; } + + /// + /// 集中器/终端编码 + /// + public string? Code { get; set; } + + /// + /// 行政区划码A1 + /// + public string? A1 { get; set; } + /// + /// 终端地址A2 + /// + public int A2 { get; set; } + /// + /// 站地址和组地址标志A3 + /// + public A3? A3 { get; set; } + } + + /// + /// 站地址和组地址标志A3 + /// + public class A3 + { + /// + /// 地址域A3报文 + /// + public BaseHexMessage? BaseHexMessage { get; set; } + + /// + /// 终端组地址标志,D0=0即False 表示终端地址A2 为单地址 + /// + public int D0 { get; set; } + /// + /// 主站地址 MSA D1~D7 组成 0~127 + /// + public int D1_D7 { get; set; } + } + + + /// + /// 帧序列域 SEQ + /// + public class SEQ + { + /// + /// 帧序列域报文 + /// + public BaseHexMessage? BaseHexMessage { get; set; } + /// + /// 响应帧序号 + /// + public int PSEQ { get; set; } + /// + /// CON为“1”,表示需要对该帧报文进行确认;置“0”,表示不需要对该帧报文进行确认。 + /// + public int CON { get; set; } + /// + /// 末帧标志 + /// + public int FIN { get; set; } + /// + /// 首帧标志 + /// + public int FIR { get; set; } + /// + /// 帧时间标签有效位,TpV=0,表示在附加信息域中无时间标签Tp;TpV=1,表示在附加信息域中带有时间标签Tp + /// + public int TpV { get; set; } + } + + + + /// + /// 用户数据区 + /// 功能码 + /// + public class AFN_FC + { + public BaseHexMessage? BaseHexMessage { get; set; } + + /// + /// 功能码 + /// + public int AFN { get; set; } + } + + + /// + /// 用户数据区 + /// 信息点DA Pn + /// + public class DA + { + public BaseHexMessage? BaseHexMessage { get; set; } + + /// + /// 信息点 DA Pn + /// + public int Pn { get; set; } + } + + + /// + /// 用户数据区 + /// 信息类DT Fn + /// + public class DT + { + public BaseHexMessage? BaseHexMessage { get; set; } + /// + /// 信息类 DT Fn + /// public int Fn { get; set; } - - public string Text { get; set; } - - public List UpList { get; set; } - } - public class TB3761UP + + /// + /// 数据单元标识和数据单元格式 + /// + public class UnitData: BaseHexMessage { - public int Id { get; set; } - public string Name { get; set; } - - public string? Value { get; set; } - - public string DataType { get; set; } - - public int DataIndex { get; set; } - - //public int DataIndex { get; set; } - - public int DataCount { get; set; } - - //public int ParentId { get; set; } - - public int Sort { get; set; } - - public List Tb3761UpChildlList { get; set; } } + #endregion + } diff --git a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs index 2573ab7..726a976 100644 --- a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs @@ -4,22 +4,25 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.Protocol.Contracts.Abstracts; +using Microsoft.Extensions.Logging; +using TouchSocket.Sockets; namespace JiShe.CollectBus.Protocol.Test { - public class TestProtocolPlugin : BaseProtocolPlugin + public class TestProtocolPlugin : ProtocolPlugin { + private readonly ILogger _logger; /// /// Initializes a new instance of the class. /// /// The service provider. - public TestProtocolPlugin(IServiceProvider serviceProvider) : base(serviceProvider) + public TestProtocolPlugin(IServiceProvider serviceProvider, ILogger logger) : base(serviceProvider, logger) { } public sealed override ProtocolInfo Info => new(nameof(TestProtocolPlugin), "Test", "TCP", "Test协议", "DTS1980-Test"); - public override async Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) + public override Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? receivedAction = null) { throw new NotImplementedException(); } @@ -150,6 +153,8 @@ namespace JiShe.CollectBus.Protocol.Test var a = $"{a1}{a2Dec.ToString().PadLeft(5, '0')}"; return a; } + + #endregion } } diff --git a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs index 5cda0c8..a53f1fb 100644 --- a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs +++ b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs @@ -1,5 +1,8 @@ -using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Abstracts; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using TouchSocket.Core; using Volo.Abp; using Volo.Abp.Modularity; diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index a28cd2d..8f7af53 100644 --- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -1,62 +1,245 @@ -using JiShe.CollectBus.Common.Enums; +using DeviceDetectorNET.Parser.Device; +using JiShe.CollectBus.Common.BuildSendDatas; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.Enums; +using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts.Abstracts; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; +using TouchSocket.Sockets; +using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Protocol { - public class StandardProtocolPlugin : BaseProtocolPlugin + public class StandardProtocolPlugin : ProtocolPlugin { + private readonly ILogger _logger; + + private readonly IProducerService _producerService; + + private readonly IRepository _deviceRepository; /// /// Initializes a new instance of the class. /// /// The service provider. - public StandardProtocolPlugin(IServiceProvider serviceProvider) : base(serviceProvider) + public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger logger) : base(serviceProvider, logger) { + _logger= logger; + //_logger = serviceProvider.GetRequiredService>(); + _producerService = serviceProvider.GetRequiredService(); + _deviceRepository = serviceProvider.GetRequiredService>(); } public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980"); - public override async Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) + public override async Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? sendAction = null) { - var hexStringList = messageReceived.MessageHexString.StringToPairs(); - var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); - var afn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); - var fn = (int)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); - - T analyze = default; - - switch ((AFN)afn) + TB3761? tB3761 = Analysis3761(messageReceived); + if (tB3761 != null) { - case AFN.确认或否认: - AnalyzeAnswerDataAsync(messageReceived, sendAction); - break; - case AFN.设置参数: break; - case AFN.查询参数: break; - case AFN.请求实时数据: - if (Enum.IsDefined(typeof(ATypeOfDataItems), fn)) + if (tB3761.AFN_FC?.AFN == (int)AFN.链路接口检测) + { + if (tB3761.A == null || tB3761.A.Code.IsNullOrWhiteSpace() || tB3761.A.A3?.D1_D7 == null || tB3761.SEQ?.PSEQ == null) { - analyze = (T?)AnalyzeReadingDataAsync(messageReceived, sendAction); + _logger.LogError($"解析AFN.链路接口检测报文失败,报文:{messageReceived},TB3761:{tB3761.Serialize()}"); } - break; - case AFN.请求历史数据: - if (Enum.IsDefined(typeof(IIdataTypeItems), fn)) + else { - analyze = (T?)AnalyzeReadingTdcDataAsync(messageReceived, sendAction); + if (tB3761.SEQ.CON == 1) + { + if (tB3761.DT?.Fn == 1) + { + // 登录回复 + await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ); + } + else if (tB3761.DT?.Fn == 2) + { + // 心跳回复 + await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ); + } + } } - break; - case AFN.数据转发: - AnalyzeTransparentForwardingAnswerAsync(messageReceived, sendAction); - break; + + } + } - return await Task.FromResult(analyze); + return (tB3761 as T)!; } + + /// + /// 登录回复 + /// + /// + /// + /// + /// + /// + public async Task LoginAsync(ITcpSessionClient client,string messageReceived, string code, int? msa, int? pseq) + { + string oldClientId = $"{client.Id}"; + + await client.ResetIdAsync(code); + + var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code); + if (deviceInfoList != null && deviceInfoList.Count > 1) + { + //todo 推送集中器编号重复预警 + _logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复"); + return; + } + + var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code); + if (entity == null) + { + await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online)); + } + else + { + entity.UpdateByLoginAndHeartbeat(oldClientId); + await _deviceRepository.UpdateAsync(entity); + } + + var messageReceivedLoginEvent = new MessageReceivedLogin + { + ClientId = code, + ClientIp = client.IP, + ClientPort = client.Port, + MessageHexString = messageReceived, + DeviceNo = code, + MessageId = Guid.NewGuid().ToString() + }; + + //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); + + + await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); + + //await _producerBus.Publish( messageReceivedLoginEvent); + + + var reqParam = new ReqParameter2 + { + AFN = AFN.确认或否认, + FunCode = (int)CFromStationFunCode.链路数据, + PRM = PRM.从动站报文, + A =code, + Seq = new Seq() + { + TpV = TpV.附加信息域中无时间标签, + FIRFIN = FIRFIN.单帧, + CON = CON.需要对该帧进行确认, + PRSEQ = pseq!.Value + }, + MSA = msa!.Value, + Pn = 0, + Fn = 1 + }; + var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); + //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); + + await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedLoginEvent.ClientId, DeviceNo = messageReceivedLoginEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceivedLoginEvent.MessageId }); + //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); + } + + /// + /// 心跳帧解析 + /// + /// + /// + /// + /// + /// + public async Task HeartbeatAsync(ITcpSessionClient client,string messageReceived, string code, int? msa, int? pseq) + { + + string clientId = code; + string oldClientId = $"{client.Id}"; + + var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code); + if (deviceInfoList != null && deviceInfoList.Count > 1) + { + //todo 推送集中器编号重复预警 + _logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复"); + return; + } + + var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code); + if (entity == null) //没有登录帧的设备,只有心跳帧 + { + await client.ResetIdAsync(clientId); + await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online)); + } + else + { + if (clientId != oldClientId) + { + entity.UpdateByLoginAndHeartbeat(oldClientId); + } + else + { + entity.UpdateByLoginAndHeartbeat(); + } + + await _deviceRepository.UpdateAsync(entity); + } + + var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat + { + ClientId = clientId, + ClientIp = client.IP, + ClientPort = client.Port, + MessageHexString = messageReceived, + DeviceNo = code, + MessageId = Guid.NewGuid().ToString() + }; + //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); + + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); + //await _producerBus.Publish(messageReceivedHeartbeatEvent); + + var reqParam = new ReqParameter2() + { + AFN = AFN.确认或否认, + FunCode = (int)CFromStationFunCode.链路数据, + PRM = PRM.从动站报文, + A = code, + Seq = new Seq() + { + TpV = TpV.附加信息域中无时间标签, + FIRFIN = FIRFIN.单帧, + CON = CON.不需要对该帧进行确认, + PRSEQ = pseq!.Value, + }, + MSA = msa!.Value, + Pn = 0, + Fn = 1 + }; + var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); + //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); + + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceivedHeartbeatEvent.ClientId, DeviceNo = messageReceivedHeartbeatEvent.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceivedHeartbeatEvent.MessageId }); + + //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); + + } + + + + + + #region 上行命令 //68 diff --git a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 5a99bf1..7e2650f 100644 --- a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -14,6 +14,10 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; +using JiShe.CollectBus.Protocol.Contracts.Abstracts; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Models; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using TouchSocket.Core; using TouchSocket.Sockets; @@ -21,6 +25,7 @@ using Volo.Abp.Caching; using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Repositories; +using static System.Formats.Asn1.AsnWriter; using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.Plugins @@ -31,64 +36,83 @@ namespace JiShe.CollectBus.Plugins private readonly ILogger _logger; private readonly IRepository _deviceRepository; private readonly IDistributedCache _ammeterInfoCache; + private readonly IServiceProvider _serviceProvider; - /// - /// - /// - /// - /// - /// - /// + /// + /// + /// + /// + /// + /// + /// + /// public TcpMonitor(IProducerService producerService, ILogger logger, IRepository deviceRepository, - IDistributedCache ammeterInfoCache) + IDistributedCache ammeterInfoCache, IServiceProvider serviceProvider) { _producerService = producerService; _logger = logger; _deviceRepository = deviceRepository; _ammeterInfoCache = ammeterInfoCache; + _serviceProvider= serviceProvider; + + } public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e) { var messageHexString = Convert.ToHexString(e.ByteBlock.Span); - var hexStringList = messageHexString.StringToPairs(); - var aFn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); - var fn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); - var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); - if (aFn.HasValue && fn.HasValue && aTuple != null && !string.IsNullOrWhiteSpace(aTuple.Item1)) - { - var tcpSessionClient = (ITcpSessionClient)client; - if ((AFN)aFn == AFN.链路接口检测) - { - switch (fn) - { - case 1: - await OnTcpLoginReceived(tcpSessionClient, messageHexString, aTuple.Item1); - break; - case 3: - //心跳帧有两种情况: - //1. 集中器先有登录帧,再有心跳帧 - //2. 集中器没有登录帧,只有心跳帧 - await OnTcpHeartbeatReceived(tcpSessionClient, messageHexString, aTuple.Item1); - break; - default: - _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); - break; - } - } - else - { - await OnTcpNormalReceived(tcpSessionClient, messageHexString, aTuple.Item1,aFn.ToString()!.PadLeft(2,'0')); - } - } - else + var tcpSessionClient = (ITcpSessionClient)client; + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (protocolPlugin == null) { - _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); + _logger.LogError("协议不存在!"); } + protocolPlugin.Analysis3761(messageHexString); + + TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync(tcpSessionClient, messageHexString); + + + + //var hexStringList = messageHexString.StringToPairs(); + //var aFn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); + //var fn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); + //var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); + //if (aFn.HasValue && fn.HasValue && aTuple != null && !string.IsNullOrWhiteSpace(aTuple.Item1)) + //{ + //var tcpSessionClient = (ITcpSessionClient)client; + + //if ((AFN)aFn == AFN.链路接口检测) + //{ + // switch (fn) + // { + // case 1: + // await OnTcpLoginReceived(tcpSessionClient, messageHexString, aTuple.Item1); + // break; + // case 3: + // //心跳帧有两种情况: + // //1. 集中器先有登录帧,再有心跳帧 + // //2. 集中器没有登录帧,只有心跳帧 + // await OnTcpHeartbeatReceived(tcpSessionClient, messageHexString, aTuple.Item1); + // break; + // default: + // _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); + // break; + // } + //} + //else + //{ + //await OnTcpNormalReceived(tcpSessionClient, messageHexString); // , aTuple.Item1, aFn.ToString()!.PadLeft(2, '0') + //} + //} + //else + //{ + // _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); + //} + await e.InvokeNext(); } @@ -236,8 +260,18 @@ namespace JiShe.CollectBus.Plugins /// /// /// - private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo,string aFn) + private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient, string messageHexString) //, string deviceNo,string aFn { + var _protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (_protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } + TB3761? tB3761 = await _protocolPlugin!.AnalyzeAsync(tcpSessionClient, messageHexString); + + + + //await _producerBus.Publish(new MessageReceived //{ // ClientId = client.Id, @@ -260,15 +294,19 @@ namespace JiShe.CollectBus.Plugins // DeviceNo = deviceNo, // MessageId = NewId.NextGuid().ToString() //}); - await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived - { - ClientId = client.Id, - ClientIp = client.IP, - ClientPort = client.Port, - MessageHexString = messageHexString, - DeviceNo = deviceNo, - MessageId = Guid.NewGuid().ToString() - }); + + //TODO:根据AFN进行分流推送到kafka + + + //await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + //{ + // ClientId = client.Id, + // ClientIp = client.IP, + // ClientPort = client.Port, + // MessageHexString = messageHexString, + // DeviceNo = deviceNo, + // MessageId = Guid.NewGuid().ToString() + //}); } } } diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 889cd91..17a6120 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -151,16 +151,14 @@ namespace JiShe.CollectBus.Subscribers } else { - //todo 会根据不同的协议进行解析,然后做业务处理 - TB3761 fN = await protocolPlugin.AnalyzeAsync(receivedMessage); - if(fN == null) + TB3761 tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); + if (tB3761 == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); } - var tb3761FN = fN.FnList.FirstOrDefault(); - if (tb3761FN == null) + if (tB3761.DT == null || tB3761.AFN_FC == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); return SubscribeAck.Success(); @@ -170,8 +168,8 @@ namespace JiShe.CollectBus.Subscribers var entity = new MeterReadingRecords() { ReceivedMessageHexString = receivedMessage.MessageHexString, - AFN = fN.Afn, - Fn = tb3761FN.Fn, + AFN = (AFN)tB3761.AFN_FC.AFN, + Fn = tB3761.DT.Fn, Pn = 0, FocusAddress = "", MeterAddress = "", @@ -208,7 +206,7 @@ namespace JiShe.CollectBus.Subscribers } else { - await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); + //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); } } @@ -228,7 +226,7 @@ namespace JiShe.CollectBus.Subscribers } else { - await protocolPlugin.LoginAsync(receivedLoginMessage); + //await protocolPlugin.LoginAsync(receivedLoginMessage); await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); } } diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs index f4dd11d..a96c16a 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs @@ -375,22 +375,30 @@ namespace JiShe.CollectBus.Common.BuildSendDatas #region 排序 var timeSets = timeSetDetails; - Dictionary dicTsDetails = new Dictionary(); + // 旧 + //Dictionary dicTsDetails = new Dictionary(); + //foreach (var timeSet in timeSets) + //{ + // int firstMonty = timeSet.Months[0]; + // if (!dicTsDetails.Keys.Contains(firstMonty)) + // dicTsDetails.Add(firstMonty, timeSet); + //} + + //var sortKeys = dicTsDetails.Keys.OrderBy(n => n).ToList(); + //List orderTsDetails = new List(); + //foreach (var key in sortKeys) + //{ + // orderTsDetails.Add(dicTsDetails[key]); + //} + + //timeSetDetails = orderTsDetails; + + // 新 foreach (var timeSet in timeSets) { - int firstMonty = timeSet.Months[0]; - if (!dicTsDetails.Keys.Contains(firstMonty)) - dicTsDetails.Add(firstMonty, timeSet); + timeSet.Months = timeSet.Months.OrderBy(m => m).ToArray(); } - - var sortKeys = dicTsDetails.Keys.OrderBy(n => n).ToList(); - List orderTsDetails = new List(); - foreach (var key in sortKeys) - { - orderTsDetails.Add(dicTsDetails[key]); - } - - timeSetDetails = orderTsDetails; + timeSetDetails = timeSets; #endregion diff --git a/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs b/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs index 94e7421..8eeb1ff 100644 --- a/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs +++ b/shared/JiShe.CollectBus.Common/Extensions/StringExtensions.cs @@ -1128,6 +1128,21 @@ namespace JiShe.CollectBus.Common.Extensions return string.Join(" ", strArr.Reverse()); } + + /// + /// 高低位反转,并转换成字符串 + /// + /// 16进制字符集合 + /// 16进制字符集合开始索引 + /// 取多少个数据 + /// + public static string ListReverseToStr(this List list, int index, int count) + { + var addrList = list.GetRange(index, count); + addrList.Reverse();//高低位反转 + return string.Join("", addrList); + } + /// /// 二进制转十六进制 /// @@ -1163,6 +1178,18 @@ namespace JiShe.CollectBus.Common.Extensions return decimalNumber; } + ///// + ///// 十六进制转二进制 + ///// + ///// + ///// + //public static string HexToBin(this string hexString) + //{ + // var binaryValue = Convert.ToString(Convert.ToInt32(hexString, 16), 2); + // return binaryValue; + //} + + /// /// 十六进制转二进制 /// @@ -1170,8 +1197,14 @@ namespace JiShe.CollectBus.Common.Extensions /// public static string HexToBin(this string hexString) { - var binaryValue = Convert.ToString(Convert.ToInt32(hexString, 16), 2); - return binaryValue; + string result = string.Empty; + foreach (char c in hexString) + { + int v = Convert.ToInt32(c.ToString(), 16); + int v2 = int.Parse(Convert.ToString(v, 2)); + result += string.Format("{0:d4}", v2); + } + return result; } /// diff --git a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj index 3e60600..b4f7d03 100644 --- a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj +++ b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj @@ -62,7 +62,10 @@ - Always + PreserveNewest + + + PreserveNewest diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index 9d1f7cb..4c3a7e5 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -84,7 +84,7 @@ "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, "NumPartitions": 30, - "ServerTagName": "JiSheCollectBus2" + "ServerTagName": "JiSheCollectBus99" }, "IoTDBOptions": { "UserName": "root",