修改代码

This commit is contained in:
zenghongyao 2025-04-22 16:48:53 +08:00
parent b1a5d29fa7
commit 7c55d96b7c
13 changed files with 334 additions and 53 deletions

View File

@ -65,7 +65,7 @@ namespace JiShe.CollectBus.Kafka
// 实现IKafkaSubscribe接口
var subscribeTypes = assembly.GetTypes().Where(type =>
typeof(IKafkaSubscribe).IsAssignableFrom(type) &&
!type.IsAbstract && !type.IsInterface).ToList(); ;
!type.IsAbstract && !type.IsInterface).ToList();
if (subscribeTypes.Count == 0)
continue;

View File

@ -66,6 +66,11 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
TB3761 tB3761 = new TB3761
{
BaseHexMessage = new BaseHexMessage
{
HexMessageString = messageReceived,
HexMessageList = hexStringList
},
C = Analysis_C(hexStringList),
A = Analysis_A(hexStringList),
AFN_FC = Analysis_AFN_FC(hexStringList),

View File

@ -0,0 +1,25 @@
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts
{
public class AnalysisStrategyContext<TInput, TResult>
{
private readonly IAnalysisStrategy<TInput, TResult> _analysisStrategy;
public AnalysisStrategyContext(IAnalysisStrategy<TInput, TResult> analysisStrategy)
{
_analysisStrategy = analysisStrategy;
}
public Task<TResult> ExecuteAnalysisStrategy(TInput input)
{
return _analysisStrategy.ExecuteAsync(input);
}
}
}

View File

@ -0,0 +1,15 @@
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Dto;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
{
public interface IAnalysisStrategy<TInput, TResult>
{
Task<TResult> ExecuteAsync(TInput input);
}
}

View File

@ -12,6 +12,11 @@ namespace JiShe.CollectBus.Protocol.Contracts.Models
/// </summary>
public class TB3761
{
/// <summary>
/// 报文
/// </summary>
public BaseHexMessage? BaseHexMessage { get; set;}
/// <summary>
/// 控制域C
/// </summary>

View File

@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Dto
{
public class AFN0_F1_AnalysisDto: UnitDataDto
{
}
}

View File

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Dto
{
public class UnitDataDto
{
/// <summary>
/// 集中器地址
/// </summary>
public string? Code { get; set; }
/// <summary>
/// AFN功能码
/// </summary>
public int AFN { get; set; }
/// <summary>
/// 信息点
/// </summary>
public int Pn { get; set; }
/// <summary>
/// 信息类
/// </summary>
public int Fn { get; set; }
/// <summary>
/// 数据时标最近数据时间点的时间8:00 08:15 记录08:15
/// </summary>
public string? DataTime { get; set; }
/// <summary>
/// 密度(分)
/// </summary>
public int TimeDensity { get; set; }
}
}

View File

@ -0,0 +1,45 @@
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Dto;
using Microsoft.Extensions.Logging;
using Microsoft.IdentityModel.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H
{
public class AFN0_F1_Analysis: IAnalysisStrategy<TB3761, AFN0_F1_AnalysisDto>
{
private readonly ILogger<AFN0_F1_Analysis> _logger;
public AFN0_F1_Analysis(ILogger<AFN0_F1_Analysis> logger)
{
_logger = logger;
}
public Task<AFN0_F1_AnalysisDto> ExecuteAsync(TB3761 tB3761)
{
try
{
ArgumentNullException.ThrowIfNull(nameof(tB3761));
AFN0_F1_AnalysisDto dto = new AFN0_F1_AnalysisDto
{
Code = tB3761.A?.Code,
AFN = tB3761.AFN_FC?.AFN ?? 0,
Fn = tB3761.DT?.Fn ?? 0,
Pn = tB3761.DA?.Pn ?? 0
};
return Task.FromResult(dto);
}
catch (Exception ex)
{
_logger.LogError(ex, $"00_1解析失败:{tB3761.A?.Code}-{tB3761.DT?.Fn ?? 0}-{tB3761?.BaseHexMessage?.HexMessageString},{ex.Message}");
return null;
}
}
}
}

View File

@ -1,7 +1,12 @@
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.AnalysisData;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Serilog.Core;
using System.Reflection;
using TouchSocket.Core;
using Volo.Abp;
using Volo.Abp.Modularity;
@ -13,6 +18,7 @@ namespace JiShe.CollectBus.Protocol
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddKeyedSingleton<IProtocolPlugin, StandardProtocolPlugin>(nameof(StandardProtocolPlugin));
RegisterProtocolAnalysis(context.Services);
}
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
@ -20,5 +26,45 @@ namespace JiShe.CollectBus.Protocol
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService<IProtocolPlugin>(nameof(StandardProtocolPlugin));
await standardProtocol.AddAsync();
}
public void RegisterProtocolAnalysis(IServiceCollection services)
{
// 批量注册
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
if (string.IsNullOrWhiteSpace(assemblyPath))
{
return;
}
var dllFiles = Directory.GetFiles(Path.Combine(assemblyPath, "Plugins") , "*.dll");
foreach (var file in dllFiles)
{
// 跳过已加载的程序集
var assemblyName = AssemblyName.GetAssemblyName(file);
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 实现IAnalysisStrategy接口
//var analysisStrategyTypes = assembly.GetTypes().Where(t =>!t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i =>
//i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>))).ToList();
var analysisStrategyTypes = assembly.GetTypes().Where(type =>
typeof(IAnalysisStrategy<,>).IsAssignableFrom(type) &&
!type.IsAbstract && !type.IsInterface).ToList();
if (analysisStrategyTypes.Count == 0)
continue;
foreach (var analysisStrategyType in analysisStrategyTypes)
{
// 取所有接口
//var interfaceTypes = analysisStrategyType.GetInterfaces()
// .Where(i => i.IsGenericType &&
// i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>));
//foreach (var interfaceType in interfaceTypes)
//{
// services.AddKeyedTransient(analysisStrategyType, nameof(analysisStrategyType));
//}
services.AddKeyedTransient(analysisStrategyType, nameof(analysisStrategyType));
}
}
}
}
}

View File

@ -74,7 +74,6 @@ namespace JiShe.CollectBus.Protocol
}
}
return (tB3761 as T)!;
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
@ -130,11 +131,11 @@ namespace JiShe.CollectBus.Plugins
/// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761)
{
var _protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (_protocolPlugin == null)
{
_logger.LogError("376.1协议插件不存在!");
}
//var _protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
//if (_protocolPlugin == null)
//{
// _logger.LogError("376.1协议插件不存在!");
//}
//await _producerBus.Publish(new MessageReceived
@ -160,16 +161,44 @@ namespace JiShe.CollectBus.Plugins
// MessageId = NewId.NextGuid().ToString()
//});
//TODO根据AFN进行分流推送到kafka
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
if(tB3761?.AFN_FC?.AFN==null || tB3761.DT?.Fn==null)
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = messageHexString,
DeviceNo = tB3761?.A?.Code!,
MessageId = Guid.NewGuid().ToString()
});
_logger.LogError("376.1协议解析AFN失败");
return;
}
// 登录心跳已做了处理,故需要忽略登录和心跳帧
//if(tB3761.DT?.Fn == (int)FN.登录 || tB3761.DT?.Fn == (int)FN.心跳)
// return;
//TODO根据AFN进行分流推送到kafka
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761?.AFN_FC?.AFN.ToString().PadLeft(2,'0'));
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByReceived();
if(topics.Contains(topicName))
await _producerService.ProduceAsync(topicName, new MessageReceived
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = messageHexString,
DeviceNo = tB3761?.A?.Code!,
MessageId = Guid.NewGuid().ToString()
});
else
{
_logger.LogError($"不支持的上报kafka主题{topicName}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = messageHexString,
DeviceNo = tB3761?.A?.Code!,
MessageId = Guid.NewGuid().ToString()
});
}
}
}
}

View File

@ -21,6 +21,8 @@ using Volo.Abp.Domain.Repositories;
using System.Collections.Generic;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.Protocol.Dto;
using System.Collections;
namespace JiShe.CollectBus.Subscribers
{
@ -64,26 +66,44 @@ namespace JiShe.CollectBus.Subscribers
//[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
public async Task<ISubscribeAck> LoginIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{
bool isAck = false;
foreach (var issuedEventMessage in issuedEventMessages)
{
var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
if (loginEntity == null)
{
isAck=false;
break;
}
_logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}");
var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
loginEntity.AckTime = Clock.Now;
loginEntity.IsAck = true;
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
if (_tcpService.ClientExists(issuedEventMessage.ClientId))
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
isAck = true;
}
return SubscribeAck.Success();
// TODO:暂时ACK等后续处理是否放到私信队列中
return isAck? SubscribeAck.Success(): SubscribeAck.Fail();
}
[KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)]
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)]
public async Task<ISubscribeAck> HeartbeatIssuedEvent(List<IssuedEventMessage> issuedEventMessages)
{
bool isAck = false;
foreach (var issuedEventMessage in issuedEventMessages)
{
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId);
if (heartbeatEntity == null)
{
isAck = false;
break;
}
_logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}");
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId);
heartbeatEntity.AckTime = Clock.Now;
heartbeatEntity.IsAck = true;
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
@ -92,11 +112,11 @@ namespace JiShe.CollectBus.Subscribers
//{
// await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message);
//}
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
if(_tcpService.ClientExists(issuedEventMessage.ClientId))
await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message);
}
return SubscribeAck.Success();
// TODO:暂时ACK等后续处理是否放到私信队列中
return isAck ? SubscribeAck.Success() : SubscribeAck.Fail();
}
[KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)]
@ -158,19 +178,21 @@ namespace JiShe.CollectBus.Subscribers
//[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)]
public async Task<ISubscribeAck> ReceivedHeartbeatEvent(List<MessageReceivedHeartbeat> receivedHeartbeatMessages)
{
foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages)
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
//await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
}
}
//foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages)
//{
// var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
// if (protocolPlugin == null)
// {
// _logger.LogError("协议不存在!");
// }
// else
// {
// //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage);
// await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage);
// }
//}
await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages);
return SubscribeAck.Success();
}
@ -178,21 +200,57 @@ namespace JiShe.CollectBus.Subscribers
//[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)]
public async Task<ISubscribeAck> ReceivedLoginEvent(List<MessageReceivedLogin> receivedLoginMessages)
{
foreach (var receivedLoginMessage in receivedLoginMessages)
//foreach (var receivedLoginMessage in receivedLoginMessages)
//{
//var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
//if (protocolPlugin == null)
//{
// _logger.LogError("协议不存在!");
//}
//else
//{
// //await protocolPlugin.LoginAsync(receivedLoginMessage);
// await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
//}
//}
await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages);
return SubscribeAck.Success();
}
[KafkaSubscribe(ProtocolConst.SubscriberAFN01HReceivedEventNameTemp)]
public async Task<ISubscribeAck> ReceivedAFN00Event(MessageReceived receivedMessage)
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
{
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null)
_logger.LogError("协议不存在!");
}
else
{
TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString);
if (tB3761 == null)
{
_logger.LogError("协议不存在!");
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
else
if (tB3761.DT == null || tB3761.AFN_FC == null)
{
//await protocolPlugin.LoginAsync(receivedLoginMessage);
await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage);
Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
return SubscribeAck.Success();
}
//string serverName = $"AFN{tB3761.AFN_FC.AFN}_F{tB3761.DT.Fn}_Analysis";
//var analysisStrategy = _serviceProvider.GetKeyedService<IAnalysisStrategy>($"AFN0_F1_Analysis");
//var data = await analysisStrategy.ExecuteAsync<UnitDataDto<AFN0_F1_AnalysisDto>>(tB3761);
}
return SubscribeAck.Success();
}
}
}

View File

@ -16,7 +16,6 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
<title>后端服务</title>
</head>
<body>