调整框架

This commit is contained in:
ChenYi 2025-04-25 14:37:35 +08:00
parent a5cd6b86f2
commit 51b7147a38
10 changed files with 182 additions and 561 deletions

View File

@ -45,7 +45,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "5.Shared", "5.Shared", "{EB
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "modules\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{6D6A2A58-7406-9C8C-7B23-3E442CCE3E6B}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "6", "6", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "6.External", "6.External", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T1882018", "protocols\JiShe.CollectBus.Protocol.T1882018\JiShe.CollectBus.Protocol.T1882018.csproj", "{430D298B-377E-49B8-83AA-ADC7C0EBDB0F}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T1882018", "protocols\JiShe.CollectBus.Protocol.T1882018\JiShe.CollectBus.Protocol.T1882018.csproj", "{430D298B-377E-49B8-83AA-ADC7C0EBDB0F}"
EndProject EndProject
@ -55,6 +55,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol",
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T6452007", "protocols\JiShe.CollectBus.Protocol.T6452007\JiShe.CollectBus.Protocol.T6452007.csproj", "{75B7D419-C261-577D-58D6-AA3ACED9129F}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T6452007", "protocols\JiShe.CollectBus.Protocol.T6452007\JiShe.CollectBus.Protocol.T6452007.csproj", "{75B7D419-C261-577D-58D6-AA3ACED9129F}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.PluginFileWatcher", "external\JiShe.CollectBus.PluginFileWatcher\JiShe.CollectBus.PluginFileWatcher.csproj", "{0F67A493-A4DF-550E-AB4D-95F55144C706}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -141,6 +143,10 @@ Global
{75B7D419-C261-577D-58D6-AA3ACED9129F}.Debug|Any CPU.Build.0 = Debug|Any CPU {75B7D419-C261-577D-58D6-AA3ACED9129F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{75B7D419-C261-577D-58D6-AA3ACED9129F}.Release|Any CPU.ActiveCfg = Release|Any CPU {75B7D419-C261-577D-58D6-AA3ACED9129F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{75B7D419-C261-577D-58D6-AA3ACED9129F}.Release|Any CPU.Build.0 = Release|Any CPU {75B7D419-C261-577D-58D6-AA3ACED9129F}.Release|Any CPU.Build.0 = Release|Any CPU
{0F67A493-A4DF-550E-AB4D-95F55144C706}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0F67A493-A4DF-550E-AB4D-95F55144C706}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0F67A493-A4DF-550E-AB4D-95F55144C706}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0F67A493-A4DF-550E-AB4D-95F55144C706}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -166,6 +172,7 @@ Global
{8A61DF78-069B-40B5-8811-614E2960443E} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC} {8A61DF78-069B-40B5-8811-614E2960443E} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{E27377CC-E2D3-4237-060F-96EA214D3129} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC} {E27377CC-E2D3-4237-060F-96EA214D3129} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{75B7D419-C261-577D-58D6-AA3ACED9129F} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC} {75B7D419-C261-577D-58D6-AA3ACED9129F} = {3C3F9DB2-EC97-4464-B49F-BF1A0C2B46DC}
{0F67A493-A4DF-550E-AB4D-95F55144C706} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -1,15 +1,7 @@
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.AnalysisData; using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Dto;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Serilog.Core;
using System;
using System.Reflection; using System.Reflection;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
@ -21,8 +13,6 @@ namespace JiShe.CollectBus.Protocol
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
{ {
context.Services.AddKeyedSingleton<IProtocolPlugin, T1882018ProtocolPlugin>(nameof(T1882018ProtocolPlugin)); context.Services.AddKeyedSingleton<IProtocolPlugin, T1882018ProtocolPlugin>(nameof(T1882018ProtocolPlugin));
//RegisterProtocolAnalysis(context.Services);
LoadAnalysisStrategy(context.Services);
} }
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
@ -37,75 +27,5 @@ namespace JiShe.CollectBus.Protocol
Console.WriteLine("StandardProtocolPlugin OnApplicationShutdown"); Console.WriteLine("StandardProtocolPlugin OnApplicationShutdown");
base.OnApplicationShutdown(context); base.OnApplicationShutdown(context);
} }
public void LoadAnalysisStrategy(IServiceCollection services)
{
var assembly = Assembly.GetExecutingAssembly();
var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)));
foreach (var analysisStrategyType in analysisStrategyTypes)
{
var service = analysisStrategyType.GetInterfaces().First();
services.AddKeyedSingleton(service, analysisStrategyType.Name,analysisStrategyType);
}
}
public void RegisterProtocolAnalysis(IServiceCollection services)
{
// 扫描并注册所有策略
var strategyMetadata = new Dictionary<(string, Type, Type), Type>();
services.AddTransient<AnalysisStrategyContext>();
// 批量注册
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
if (string.IsNullOrWhiteSpace(assemblyPath))
{
return;
}
var dllFiles = Directory.GetFiles(Path.Combine(assemblyPath, "Plugins") , "*.dll");
foreach (var file in dllFiles)
{
// 跳过已加载的程序集
var assemblyName = AssemblyName.GetAssemblyName(file);
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 实现IAnalysisStrategy接口
var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)));
if (!analysisStrategyTypes.Any())
continue;
foreach (var analysisStrategyType in analysisStrategyTypes)
{
// 通过反射获取静态元数据
var strategyType = analysisStrategyType.Name;
var genericArgs = analysisStrategyType.GetInterface($"IAnalysisStrategy`2")!.GetGenericArguments();
var inputType = genericArgs[0];
var resultType = genericArgs[1];
// 注册策略实现
services.AddTransient(analysisStrategyType);
strategyMetadata[(strategyType, inputType, resultType)] = analysisStrategyType;
}
}
// 注册元数据字典
services.AddSingleton(strategyMetadata);
// 注册策略解析工厂
services.AddTransient<Func<string, Type, Type, object?>>(provider => (name, inputType, resultType) =>
{
var metadata = provider.GetRequiredService<Dictionary<(string, Type, Type), Type>>();
if (metadata.TryGetValue((name, inputType, resultType), out var strategyType))
{
return provider.GetRequiredService(strategyType);
}
else
{
var logger= provider.GetRequiredService<ILogger<AnalysisStrategyContext>>();
logger.LogWarning($"未能找到解析策略:{name}-{inputType}-{resultType}");
return null;
}
});
}
} }
} }

View File

@ -17,12 +17,12 @@ namespace JiShe.CollectBus.Protocol.SendData
/// </summary> /// </summary>
/// <param name="request"></param> /// <param name="request"></param>
/// <returns></returns> /// <returns></returns>
public delegate Telemetry1882018PacketResponse T6452007Delegate(Telemetry1882018PacketRequest request); public delegate Telemetry1882018PacketResponse T1882018Delegate(Telemetry1882018PacketRequest request);
/// <summary> /// <summary>
/// 编码与方法的映射表 /// 编码与方法的映射表
/// </summary> /// </summary>
public static readonly Dictionary<string, T6452007Delegate> T645ControlHandlers = new(); public static readonly Dictionary<string, T1882018Delegate> T1882018ControlHandlers = new();
static Telemetry1882018PacketBuilder() static Telemetry1882018PacketBuilder()
{ {
@ -33,8 +33,8 @@ namespace JiShe.CollectBus.Protocol.SendData
if (method.Name.StartsWith("C") && method.Name.EndsWith("_Send")) if (method.Name.StartsWith("C") && method.Name.EndsWith("_Send"))
{ {
string code = method.Name; string code = method.Name;
var delegateInstance = (T6452007Delegate)Delegate.CreateDelegate(typeof(T6452007Delegate), method); var delegateInstance = (T1882018Delegate)Delegate.CreateDelegate(typeof(T1882018Delegate), method);
T645ControlHandlers[code] = delegateInstance; T1882018ControlHandlers[code] = delegateInstance;
} }
} }
} }

View File

@ -9,8 +9,6 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.SendData; using JiShe.CollectBus.Protocol.Contracts.SendData;
using JiShe.CollectBus.Protocol.SendData; using JiShe.CollectBus.Protocol.SendData;
using Mapster; using Mapster;
@ -30,285 +28,61 @@ namespace JiShe.CollectBus.Protocol
private readonly IRepository<Device, Guid> _deviceRepository; private readonly IRepository<Device, Guid> _deviceRepository;
private readonly ITcpService _tcpService; private readonly ITcpService _tcpService;
public readonly Dictionary<string, Telemetry3761PacketBuilder.T3761Delegate> T3761AFNHandlers; public readonly Dictionary<string, Telemetry1882018PacketBuilder.T1882018Delegate> T188ControlHandlers;
public readonly Dictionary<string, Telemetry1882018PacketBuilder.T6452007Delegate> T645ControlHandlers;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="T1882018ProtocolPlugin"/> class. /// Initializes a new instance of the <see cref="T1882018ProtocolPlugin"/> class.
/// </summary> /// </summary>
/// <param name="serviceProvider">The service provider.</param> /// <param name="serviceProvider">The service provider.</param>
public T1882018ProtocolPlugin(IServiceProvider serviceProvider, ILogger<T1882018ProtocolPlugin> logger, ITcpService tcpService) : base(serviceProvider, logger) public T1882018ProtocolPlugin(IServiceProvider serviceProvider, ILogger<T1882018ProtocolPlugin> logger, ITcpService tcpService) : base(serviceProvider, logger, tcpService)
{ {
_logger = logger; _logger = logger;
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>(); //_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
_producerService = serviceProvider.GetRequiredService<IProducerService>(); _producerService = serviceProvider.GetRequiredService<IProducerService>();
_deviceRepository = serviceProvider.GetRequiredService<IRepository<Device, Guid>>(); _deviceRepository = serviceProvider.GetRequiredService<IRepository<Device, Guid>>();
_tcpService = tcpService; _tcpService = tcpService;
T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers; T188ControlHandlers = Telemetry1882018PacketBuilder.T1882018ControlHandlers;
T645ControlHandlers = Telemetry1882018PacketBuilder.T645ControlHandlers;
} }
public sealed override ProtocolInfo Info => new(nameof(T1882018ProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980"); public sealed override ProtocolInfo Info => new(nameof(T1882018ProtocolPlugin), "376.1/188", "TCP", "376.1协议", "DTS1980");
public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null) public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null)
{ {
TB3761? tB3761 = Analysis3761(messageReceived); //TB3761? tB3761 = Analysis3761(messageReceived);
if (tB3761 != null) //if (tB3761 != null)
{
if (tB3761.AFN_FC?.AFN == (int)AFN.)
{
if (tB3761.A == null || tB3761.A.Code.IsNullOrWhiteSpace() || tB3761.A.A3?.D1_D7 == null || tB3761.SEQ?.PSEQ == null)
{
_logger.LogError($"解析AFN.链路接口检测报文失败,报文:{messageReceived},TB3761:{tB3761.Serialize()}");
}
else
{
if (tB3761.DT?.Fn == (int)FN.)
{
// 登录回复
if (tB3761.SEQ.CON == (int)CON.)
await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
}
else if (tB3761.DT?.Fn == (int)FN.)
{
// 心跳回复
//心跳帧有两种情况:
//1. 集中器先有登录帧,再有心跳帧
//2. 集中器没有登录帧,只有心跳帧
await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
}
}
}
await OnTcpNormalReceived(client, tB3761);
}
return (tB3761 as T)!;
}
/// <summary>
/// 正常帧处理将不同的AFN进行分发
/// </summary>
/// <param name="tcpSessionClient"></param>
/// <param name="messageHexString"></param>
/// <param name="tB3761"></param>
/// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient, TB3761 tB3761)
{
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{ //{
// ClientId = client.Id, // if (tB3761.AFN_FC?.AFN == (int)AFN.链路接口检测)
// ClientIp = client.IP, // {
// ClientPort = client.Port, // if (tB3761.A == null || tB3761.A.Code.IsNullOrWhiteSpace() || tB3761.A.A3?.D1_D7 == null || tB3761.SEQ?.PSEQ == null)
// MessageHexString = messageHexString, // {
// DeviceNo = deviceNo, // _logger.LogError($"解析AFN.链路接口检测报文失败,报文:{messageReceived},TB3761:{tB3761.Serialize()}");
// MessageId = NewId.NextGuid().ToString() // }
//}); // else
// {
// if (tB3761.DT?.Fn == (int)FN.登录)
// {
// // 登录回复
// if (tB3761.SEQ.CON == (int)CON.需要对该帧进行确认)
// await LoginAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
// }
// else if (tB3761.DT?.Fn == (int)FN.心跳)
// {
// // 心跳回复
// //心跳帧有两种情况:
// //1. 集中器先有登录帧,再有心跳帧
// //2. 集中器没有登录帧,只有心跳帧
// await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
// }
// }
if (tB3761.AFN_FC.BaseHexMessage == null || tB3761.DT.BaseHexMessage == null || tB3761.BaseHexMessage.HexMessageString==null) // }
{ // await OnTcpNormalReceived(client, tB3761);
_logger.LogError("376.1协议解析AFN失败"); //}
return; //return (tB3761 as T)!;
}
// 登录心跳已做了处理,故需要忽略登录和心跳帧
if (tB3761.DT.Fn == (int)FN. || tB3761.DT.Fn == (int)FN.)
return;
//TODO根据AFN进行分流推送到kafka
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761.AFN_FC.AFN.ToString().PadLeft(2, '0'));
MessageProtocolAnalysis<TB3761> messageReceivedAnalysis = new MessageProtocolAnalysis<TB3761>()
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = tB3761.BaseHexMessage.HexMessageString!,
DeviceNo = tB3761.A.Code!,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
Data = tB3761
};
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByReceived();
if (topics.Contains(topicName))
await _producerService.ProduceAsync(topicName, messageReceivedAnalysis);
else
{
_logger.LogError($"不支持的上报kafka主题{topicName}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, messageReceivedAnalysis);
}
return null;
} }
/// <summary>
/// 登录回复
/// </summary>
/// <param name="client"></param>
/// <param name="code"></param>
/// <param name="msa"></param>
/// <param name="pseq"></param>
/// <returns></returns>
public async Task LoginAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string oldClientId = $"{client.Id}";
await client.ResetIdAsync(code);
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedLoginEvent = new MessageReceivedLogin
{
ClientId = code,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageReceived,
DeviceNo = code,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime=DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
};
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
var reqParam = new ReqParameter2
{
AFN = AFN.,
FunCode = (int)CFromStationFunCode.,
PRM = PRM.,
A = code,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = pseq!.Value
},
MSA = msa!.Value,
Pn = 0,
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
var issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedLoginEvent.ClientId,
DeviceNo = messageReceivedLoginEvent.DeviceNo,
Message = bytes,
Type = IssuedEventType.Login,
MessageId = messageReceivedLoginEvent.MessageId
};
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, issuedEventMessage);
}
}
/// <summary>
/// 心跳帧解析
/// </summary>
/// <param name="client"></param>
/// <param name="code"></param>
/// <param name="msa"></param>
/// <param name="pseq"></param>
/// <returns></returns>
public async Task HeartbeatAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string clientId = code;
string oldClientId = $"{client.Id}";
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == code);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{code},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == code);
if (entity == null) //没有登录帧的设备,只有心跳帧
{
await client.ResetIdAsync(clientId);
await _deviceRepository.InsertAsync(new Device(code, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
if (clientId != oldClientId)
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
}
else
{
entity.UpdateByLoginAndHeartbeat();
}
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
{
ClientId = clientId,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageReceived,
DeviceNo = code,
MessageId = Guid.NewGuid().ToString(),
ReceivedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")
};
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
var reqParam = new ReqParameter2()
{
AFN = AFN.,
FunCode = (int)CFromStationFunCode.,
PRM = PRM.,
A = code,
Seq = new Seq()
{
TpV = TpV.,
FIRFIN = FIRFIN.,
CON = CON.,
PRSEQ = pseq!.Value,
},
MSA = msa!.Value,
Pn = 0,
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
IssuedEventMessage issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedHeartbeatEvent.ClientId,
DeviceNo = messageReceivedHeartbeatEvent.DeviceNo,
Message = bytes,
Type = IssuedEventType.Heartbeat,
MessageId = messageReceivedHeartbeatEvent.MessageId
};
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
{
await _tcpService.SendAsync(issuedEventMessage.ClientId, bytes);
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage);
}
}
/// <summary> /// <summary>
/// 组装报文 /// 组装报文
/// </summary> /// </summary>
@ -333,7 +107,7 @@ namespace JiShe.CollectBus.Protocol
var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send"; var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send";
Telemetry1882018PacketResponse t645PacketResponse = null; Telemetry1882018PacketResponse t645PacketResponse = null;
if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName if (T188ControlHandlers != null && T188ControlHandlers.TryGetValue(t645PacketHandlerName
, out var t645PacketHandler)) , out var t645PacketHandler))
{ {
t645PacketResponse = t645PacketHandler(new Telemetry1882018PacketRequest() t645PacketResponse = t645PacketHandler(new Telemetry1882018PacketRequest()
@ -351,7 +125,7 @@ namespace JiShe.CollectBus.Protocol
} }
string afnMethonCode = $"AFN{aFNStr}_Fn_Send"; string afnMethonCode = $"AFN{aFNStr}_Fn_Send";
if (T3761AFNHandlers != null && T3761AFNHandlers.TryGetValue(afnMethonCode if (base.T3761AFNHandlers != null && base.T3761AFNHandlers.TryGetValue(afnMethonCode
, out var handler)) , out var handler))
{ {
builderResponse = handler(new Telemetry3761PacketRequest() builderResponse = handler(new Telemetry3761PacketRequest()
@ -373,134 +147,5 @@ namespace JiShe.CollectBus.Protocol
return await Task.FromResult(result); return await Task.FromResult(result);
} }
#region
//68
//32 00
//32 00
//68
//C9 1100'1001. 控制域C。
// D7=1, (终端发送)上行方向。
// D6=1, 此帧来自启动站。
// D5=0, (上行方向)要求访问位。表示终端无事件数据等待访问。
// D4=0, 保留
// D3~D0=9, 功能码。链路测试
//20 32 行政区划码
//90 26 终端地址
//00 主站地址和组地址标志。终端为单地址。 //3220 09 87 2
// 终端启动的发送帧的 MSA 应为 0, 其主站响应帧的 MSA 也应为 0.
//02 应用层功能码。AFN=2, 链路接口检测
//70 0111'0000. 帧序列域。无时间标签、单帧、需要确认。
//00 00 信息点。DA1和DA2全为“0”时表示终端信息点。
//01 00 信息类。F1, 登录。
//44 帧尾,包含用户区数据校验和
//16 帧结束标志
/// <summary>
/// 解析上行命令
/// </summary>
/// <param name="cmd"></param>
/// <returns></returns>
public CommandReulst? AnalysisCmd(string cmd)
{
CommandReulst? commandReulst = null;
var hexStringList = cmd.StringToPairs();
if (hexStringList.Count < hearderLen)
{
return commandReulst;
}
//验证起始字符
if (!hexStringList[0].IsStartStr() || !hexStringList[5].IsStartStr())
{
return commandReulst;
}
var lenHexStr = $"{hexStringList[2]}{hexStringList[1]}";
var lenBin = lenHexStr.HexToBin();
var len = lenBin.Remove(lenBin.Length - 2).BinToDec();
//验证长度
if (hexStringList.Count - 2 != hearderLen + len)
return commandReulst;
var userDataIndex = hearderLen;
var c = hexStringList[userDataIndex];//控制域 1字节
userDataIndex += 1;
var aHexList = hexStringList.Skip(userDataIndex).Take(5).ToList();//地址域 5字节
var a = AnalysisA(aHexList);
var a3Bin = aHexList[4].HexToBin().PadLeft(8, '0');
var mSA = a3Bin.Substring(0, 7).BinToDec();
userDataIndex += 5;
var aFN = (AFN)hexStringList[userDataIndex].HexToDec();//1字节
userDataIndex += 1;
var seq = hexStringList[userDataIndex].HexToBin().PadLeft(8, '0');
var tpV = (TpV)Convert.ToInt32(seq.Substring(0, 1));
var fIRFIN = (FIRFIN)Convert.ToInt32(seq.Substring(1, 2));
var cON = (CON)Convert.ToInt32(seq.Substring(3, 1));
var prseqBin = seq.Substring(4, 4);
userDataIndex += 1;
// (DA2 - 1) * 8 + DA1 = pn
var da1Bin = hexStringList[userDataIndex].HexToBin();
var da1 = da1Bin == "0" ? 0 : da1Bin.Length;
userDataIndex += 1;
var da2 = hexStringList[userDataIndex].HexToDec();
var pn = da2 == 0 ? 0 : (da2 - 1) * 8 + da1;
userDataIndex += 1;
//(DT2*8)+DT1=fn
var dt1Bin = hexStringList[userDataIndex].HexToBin();
var dt1 = dt1Bin != "0" ? dt1Bin.Length : 0;
userDataIndex += 1;
var dt2 = hexStringList[userDataIndex].HexToDec();
var fn = dt2 * 8 + dt1;
userDataIndex += 1;
//数据单元
var datas = hexStringList.Skip(userDataIndex).Take(len + hearderLen - userDataIndex).ToList();
//EC
//Tp
commandReulst = new CommandReulst()
{
A = a,
MSA = mSA,
AFN = aFN,
Seq = new Seq()
{
TpV = tpV,
FIRFIN = fIRFIN,
CON = cON,
PRSEQ = prseqBin.BinToDec(),
},
CmdLength = len,
Pn = pn,
Fn = fn,
HexDatas = datas
};
return commandReulst;
}
/// <summary>
/// 解析地址
/// </summary>
/// <param name="aHexList"></param>
/// <returns></returns>
private string AnalysisA(List<string> aHexList)
{
var a1 = aHexList[1] + aHexList[0];
var a2 = aHexList[3] + aHexList[2];
var a2Dec = a2.HexToDec();
var a3 = aHexList[4];
var a = $"{a1}{a2Dec.ToString().PadLeft(5, '0')}";
return a;
}
#endregion
} }
} }

View File

@ -1,5 +1,5 @@
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Interfaces;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.Protocol.AnalysisData.Appendix namespace JiShe.CollectBus.Protocol.AnalysisData.Appendix

View File

@ -1,5 +1,5 @@
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Interfaces;
namespace JiShe.CollectBus.Protocol.AnalysisData.Appendix namespace JiShe.CollectBus.Protocol.AnalysisData.Appendix
{ {

View File

@ -11,7 +11,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
{ {
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
{ {
context.Services.AddKeyedSingleton<IProtocolPlugin, StandardProtocolPlugin>(nameof(StandardProtocolPlugin)); context.Services.AddKeyedSingleton<IProtocolPlugin, T37612012ProtocolPlugin>(nameof(T37612012ProtocolPlugin));
//RegisterProtocolAnalysis(context.Services); //RegisterProtocolAnalysis(context.Services);
LoadAnalysisStrategy(context.Services); LoadAnalysisStrategy(context.Services);
} }
@ -19,7 +19,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{ {
Console.WriteLine("StandardProtocolPlugin OnApplicationInitializationAsync"); Console.WriteLine("StandardProtocolPlugin OnApplicationInitializationAsync");
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin)); var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(T37612012ProtocolPlugin));
await standardProtocol.LoadAsync(); await standardProtocol.LoadAsync();
} }

View File

@ -20,9 +20,9 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Protocol namespace JiShe.CollectBus.Protocol
{ {
public class StandardProtocolPlugin : ProtocolPlugin public class T37612012ProtocolPlugin : ProtocolPlugin
{ {
private readonly ILogger<StandardProtocolPlugin> _logger; private readonly ILogger<T37612012ProtocolPlugin> _logger;
private readonly IProducerService _producerService; private readonly IProducerService _producerService;
@ -32,10 +32,10 @@ namespace JiShe.CollectBus.Protocol
public readonly Dictionary<string, Telemetry3761PacketBuilder.T3761Delegate> T3761AFNHandlers; public readonly Dictionary<string, Telemetry3761PacketBuilder.T3761Delegate> T3761AFNHandlers;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="StandardProtocolPlugin"/> class. /// Initializes a new instance of the <see cref="T37612012ProtocolPlugin"/> class.
/// </summary> /// </summary>
/// <param name="serviceProvider">The service provider.</param> /// <param name="serviceProvider">The service provider.</param>
public StandardProtocolPlugin(IServiceProvider serviceProvider, ILogger<StandardProtocolPlugin> logger, ITcpService tcpService) : base(serviceProvider, logger) public T37612012ProtocolPlugin(IServiceProvider serviceProvider, ILogger<T37612012ProtocolPlugin> logger, ITcpService tcpService) : base(serviceProvider, logger)
{ {
_logger = logger; _logger = logger;
//_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>(); //_logger = serviceProvider.GetRequiredService<ILogger<StandardProtocolPlugin>>();
@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Protocol
T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers; T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers;
} }
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980"); public override ProtocolInfo Info => new(nameof(T37612012ProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null) public override async Task<T> AnalyzeAsync<T>(ITcpSessionClient client, string messageReceived, Action<T>? sendAction = null)
{ {
@ -304,72 +304,70 @@ namespace JiShe.CollectBus.Protocol
} }
/// <summary>
/// 组装报文
/// </summary>
/// <param name="request">报文构建参数</param>
/// <returns></returns>
public override async Task<ProtocolBuildResponse> BuildAsync(ProtocolBuildRequest request)
{
if (request == null)
{
throw new Exception($"{nameof(T37612012ProtocolPlugin)} 报文构建失败,参数为空");
}
var itemCodeArr = request.ItemCode.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
Telemetry3761PacketResponse builderResponse = null;
List<string> dataUnit = new List<string>();
//数据转发场景 10H_F1_1CH
if (aFNStr == "10" && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
{
//var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send";
//Telemetry645PacketResponse t645PacketResponse = null;
///// <summary> //if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName
///// 组装报文 // , out var t645PacketHandler))
///// </summary> //{
///// <param name="request">报文构建参数</param> // t645PacketResponse = t645PacketHandler(new Telemetry645PacketRequest()
///// <returns></returns> // {
//public override async Task<ProtocolBuildResponse> BuildAsync(ProtocolBuildRequest request) // MeterAddress = request.SubProtocolRequest.MeterAddress,
//{ // Password = request.SubProtocolRequest.Password,
// if (request == null) // ItemCode = request.SubProtocolRequest.ItemCode,
// { // });
// throw new Exception($"{nameof(StandardProtocolPlugin)} 报文构建失败,参数为空"); //}
// }
// var itemCodeArr = request.ItemCode.Split('_');
// var aFNStr = itemCodeArr[0];
// var aFN = (AFN)aFNStr.HexToDec();
// var fn = int.Parse(itemCodeArr[1]);
// Telemetry3761PacketResponse builderResponse = null;
// List<string> dataUnit = new List<string>(); //if (t645PacketResponse != null)
// //数据转发场景 10H_F1_1CH //{
// if (aFNStr == "10" && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false) // dataUnit = t645PacketResponse.Data;
// { //}
// var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send"; }
// Telemetry645PacketResponse t645PacketResponse = null;
// if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName string afnMethonCode = $"AFN{aFNStr}_Fn_Send";
// , out var t645PacketHandler)) if (T3761AFNHandlers != null && T3761AFNHandlers.TryGetValue(afnMethonCode
// { , out var handler))
// t645PacketResponse = t645PacketHandler(new Telemetry645PacketRequest() {
// { builderResponse = handler(new Telemetry3761PacketRequest()
// MeterAddress = request.SubProtocolRequest.MeterAddress, {
// Password = request.SubProtocolRequest.Password, FocusAddress = request.FocusAddress,
// ItemCode = request.SubProtocolRequest.ItemCode, Fn = fn,
// }); Pn = request.Pn,
// } DataUnit = dataUnit,
});
}
// if (t645PacketResponse != null) if (builderResponse == null)
// { {
// dataUnit = t645PacketResponse.Data; return new ProtocolBuildResponse();
// } }
// }
// string afnMethonCode = $"AFN{aFNStr}_Fn_Send"; var result = builderResponse.Adapt<ProtocolBuildResponse>();
// if (T3761AFNHandlers != null && T3761AFNHandlers.TryGetValue(afnMethonCode result.IsSuccess = true;
// , out var handler))
// {
// builderResponse = handler(new Telemetry3761PacketRequest()
// {
// FocusAddress = request.FocusAddress,
// Fn = fn,
// Pn = request.Pn,
// DataUnit = dataUnit,
// });
// }
// if (builderResponse == null) return await Task.FromResult(result);
// { }
// return new ProtocolBuildResponse();
// }
// var result = builderResponse.Adapt<ProtocolBuildResponse>();
// result.IsSuccess = true;
// return await Task.FromResult(result);
//}
#region #region
@ -499,10 +497,6 @@ namespace JiShe.CollectBus.Protocol
return a; return a;
} }
public override Task<ProtocolBuildResponse> BuildAsync(ProtocolBuildRequest request)
{
throw new NotImplementedException();
}
#endregion #endregion
} }
} }

View File

@ -1,16 +1,10 @@
using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.SendData; using JiShe.CollectBus.Protocol.Contracts.SendData;
using JiShe.CollectBus.Protocol.SendData; using JiShe.CollectBus.Protocol.SendData;
using Mapster; using Mapster;

View File

@ -0,0 +1,61 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Consts
{
/// <summary>
/// T188报文项编码
/// </summary>
public class T1882018PacketItemCodeConst
{
#region
#region
/// <summary>
/// 跳闸
/// </summary>
public const string C1C01A = "1C_1A";
/// <summary>
/// 单相合闸
/// </summary>
public const string C1C01B = "1C_1B";
/// <summary>
/// 三相合闸
/// </summary>
public const string C1C01C = "1C_1C";
/// <summary>
/// 触发报警
/// </summary>
public const string C1C02A = "1C_2A";
/// <summary>
/// 报警解除
/// </summary>
public const string C1C02B = "1C_2B";
/// <summary>
/// 保电开始
/// </summary>
public const string C1C03A = "1C_3A";
/// <summary>
/// 保电结束
/// </summary>
public const string C1C03B = "1C_3B";
#endregion
#region 广
/// <summary>
/// 广播校时
/// </summary>
public const string C08 = "08";
#endregion
#endregion
}
}