From 7c55d96b7cf8eed2bfc9d3f6b5d8b9036e31d05d Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Tue, 22 Apr 2025 16:48:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../KafkaSubscribeExtensions.cs | 2 +- .../Abstracts/ProtocolPlugin.cs | 5 + .../AnalysisStrategyContext.cs | 25 ++++ .../Interfaces/IAnalysisStrategy.cs | 15 ++ .../Models/TB3761.cs | 5 + .../Protocol/Dto/AFN0_F1_AnalysisDto.cs | 13 ++ .../Protocol/Dto/UnitDataDto.cs | 42 ++++++ .../AnalysisData/AFN_00H/AFN0_F1_Analysis.cs | 45 ++++++ .../JiSheCollectBusProtocolModule.cs | 48 ++++++- .../StandardProtocolPlugin.cs | 1 - .../Plugins/TcpMonitor.cs | 57 ++++++-- .../Subscribers/SubscriberAppService.cs | 128 +++++++++++++----- .../Pages/Monitor.cshtml | 1 - 13 files changed, 334 insertions(+), 53 deletions(-) create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IAnalysisStrategy.cs create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/UnitDataDto.cs create mode 100644 protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs index 64f8dec..3c50aae 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs @@ -65,7 +65,7 @@ namespace JiShe.CollectBus.Kafka // 实现IKafkaSubscribe接口 var subscribeTypes = assembly.GetTypes().Where(type => typeof(IKafkaSubscribe).IsAssignableFrom(type) && - !type.IsAbstract && !type.IsInterface).ToList(); ; + !type.IsAbstract && !type.IsInterface).ToList(); if (subscribeTypes.Count == 0) continue; diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs index bb9115b..b026366 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs @@ -66,6 +66,11 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { TB3761 tB3761 = new TB3761 { + BaseHexMessage = new BaseHexMessage + { + HexMessageString = messageReceived, + HexMessageList = hexStringList + }, C = Analysis_C(hexStringList), A = Analysis_A(hexStringList), AFN_FC = Analysis_AFN_FC(hexStringList), diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs new file mode 100644 index 0000000..283c3ff --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/AnalysisStrategyContext.cs @@ -0,0 +1,25 @@ +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Protocol.Contracts +{ + + public class AnalysisStrategyContext + { + private readonly IAnalysisStrategy _analysisStrategy; + + public AnalysisStrategyContext(IAnalysisStrategy analysisStrategy) + { + _analysisStrategy = analysisStrategy; + } + + public Task ExecuteAnalysisStrategy(TInput input) + { + return _analysisStrategy.ExecuteAsync(input); + } + } +} diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IAnalysisStrategy.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IAnalysisStrategy.cs new file mode 100644 index 0000000..dcd12fb --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IAnalysisStrategy.cs @@ -0,0 +1,15 @@ +using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.Protocol.Dto; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Protocol.Contracts.Interfaces +{ + public interface IAnalysisStrategy + { + Task ExecuteAsync(TInput input); + } +} diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/TB3761.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/TB3761.cs index 6ddda15..a24090a 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/TB3761.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/TB3761.cs @@ -12,6 +12,11 @@ namespace JiShe.CollectBus.Protocol.Contracts.Models /// public class TB3761 { + /// + /// 报文 + /// + public BaseHexMessage? BaseHexMessage { get; set;} + /// /// 控制域C /// diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs new file mode 100644 index 0000000..d6ab85a --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/AFN0_F1_AnalysisDto.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Protocol.Dto +{ + public class AFN0_F1_AnalysisDto: UnitDataDto + { + + } +} diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/UnitDataDto.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/UnitDataDto.cs new file mode 100644 index 0000000..cd59203 --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Protocol/Dto/UnitDataDto.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Protocol.Dto +{ + public class UnitDataDto + { + /// + /// 集中器地址 + /// + public string? Code { get; set; } + + /// + /// AFN功能码 + /// + public int AFN { get; set; } + + /// + /// 信息点 + /// + public int Pn { get; set; } + + /// + /// 信息类 + /// + public int Fn { get; set; } + + /// + /// 数据时标(最近数据时间点的时间),如:8:00 08:15 记录08:15 + /// + public string? DataTime { get; set; } + + /// + /// 密度(分) + /// + public int TimeDensity { get; set; } + } + +} diff --git a/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs b/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs new file mode 100644 index 0000000..f5ba2af --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol/AnalysisData/AFN_00H/AFN0_F1_Analysis.cs @@ -0,0 +1,45 @@ +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.Protocol.Dto; +using Microsoft.Extensions.Logging; +using Microsoft.IdentityModel.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Protocol.AnalysisData.AFN_00H +{ + public class AFN0_F1_Analysis: IAnalysisStrategy + { + private readonly ILogger _logger; + + public AFN0_F1_Analysis(ILogger logger) + { + _logger = logger; + } + + public Task ExecuteAsync(TB3761 tB3761) + { + try + { + ArgumentNullException.ThrowIfNull(nameof(tB3761)); + AFN0_F1_AnalysisDto dto = new AFN0_F1_AnalysisDto + { + Code = tB3761.A?.Code, + AFN = tB3761.AFN_FC?.AFN ?? 0, + Fn = tB3761.DT?.Fn ?? 0, + Pn = tB3761.DA?.Pn ?? 0 + }; + return Task.FromResult(dto); + } + catch (Exception ex) + { + _logger.LogError(ex, $"00_1解析失败:{tB3761.A?.Code}-{tB3761.DT?.Fn ?? 0}-{tB3761?.BaseHexMessage?.HexMessageString},{ex.Message}"); + return null; + } + } + } +} diff --git a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs index a53f1fb..4ceeb77 100644 --- a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs +++ b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs @@ -1,7 +1,12 @@ -using JiShe.CollectBus.Protocol.Contracts.Abstracts; +using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Protocol.AnalysisData; +using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; +using Serilog.Core; +using System.Reflection; using TouchSocket.Core; using Volo.Abp; using Volo.Abp.Modularity; @@ -13,6 +18,7 @@ namespace JiShe.CollectBus.Protocol public override void ConfigureServices(ServiceConfigurationContext context) { context.Services.AddKeyedSingleton(nameof(StandardProtocolPlugin)); + RegisterProtocolAnalysis(context.Services); } public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) @@ -20,5 +26,45 @@ namespace JiShe.CollectBus.Protocol var standardProtocol = context.ServiceProvider.GetRequiredKeyedService(nameof(StandardProtocolPlugin)); await standardProtocol.AddAsync(); } + + public void RegisterProtocolAnalysis(IServiceCollection services) + { + // 批量注册 + var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); + if (string.IsNullOrWhiteSpace(assemblyPath)) + { + return; + } + var dllFiles = Directory.GetFiles(Path.Combine(assemblyPath, "Plugins") , "*.dll"); + foreach (var file in dllFiles) + { + // 跳过已加载的程序集 + var assemblyName = AssemblyName.GetAssemblyName(file); + var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() + .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); + var assembly = existingAssembly ?? Assembly.LoadFrom(file); + // 实现IAnalysisStrategy接口 + //var analysisStrategyTypes = assembly.GetTypes().Where(t =>!t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => + //i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>))).ToList(); + var analysisStrategyTypes = assembly.GetTypes().Where(type => + typeof(IAnalysisStrategy<,>).IsAssignableFrom(type) && + !type.IsAbstract && !type.IsInterface).ToList(); + if (analysisStrategyTypes.Count == 0) + continue; + foreach (var analysisStrategyType in analysisStrategyTypes) + { + // 取所有接口 + //var interfaceTypes = analysisStrategyType.GetInterfaces() + // .Where(i => i.IsGenericType && + // i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)); + //foreach (var interfaceType in interfaceTypes) + //{ + // services.AddKeyedTransient(analysisStrategyType, nameof(analysisStrategyType)); + //} + services.AddKeyedTransient(analysisStrategyType, nameof(analysisStrategyType)); + } + } + + } } } diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 6c7d4ea..6a5e3a5 100644 --- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -74,7 +74,6 @@ namespace JiShe.CollectBus.Protocol } } - return (tB3761 as T)!; } diff --git a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index e631416..5d2ba4b 100644 --- a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; @@ -130,11 +131,11 @@ namespace JiShe.CollectBus.Plugins /// private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761) { - var _protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (_protocolPlugin == null) - { - _logger.LogError("376.1协议插件不存在!"); - } + //var _protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + //if (_protocolPlugin == null) + //{ + // _logger.LogError("376.1协议插件不存在!"); + //} //await _producerBus.Publish(new MessageReceived @@ -160,16 +161,44 @@ namespace JiShe.CollectBus.Plugins // MessageId = NewId.NextGuid().ToString() //}); - //TODO:根据AFN进行分流推送到kafka - await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + if(tB3761?.AFN_FC?.AFN==null || tB3761.DT?.Fn==null) { - ClientId = tcpSessionClient.Id, - ClientIp = tcpSessionClient.IP, - ClientPort = tcpSessionClient.Port, - MessageHexString = messageHexString, - DeviceNo = tB3761?.A?.Code!, - MessageId = Guid.NewGuid().ToString() - }); + _logger.LogError("376.1协议解析AFN失败"); + return; + } + // 登录心跳已做了处理,故需要忽略登录和心跳帧 + //if(tB3761.DT?.Fn == (int)FN.登录 || tB3761.DT?.Fn == (int)FN.心跳) + // return; + + //TODO:根据AFN进行分流推送到kafka + string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761?.AFN_FC?.AFN.ToString().PadLeft(2,'0')); + + List topics = ProtocolConstExtensions.GetAllTopicNamesByReceived(); + + if(topics.Contains(topicName)) + await _producerService.ProduceAsync(topicName, new MessageReceived + { + ClientId = tcpSessionClient.Id, + ClientIp = tcpSessionClient.IP, + ClientPort = tcpSessionClient.Port, + MessageHexString = messageHexString, + DeviceNo = tB3761?.A?.Code!, + MessageId = Guid.NewGuid().ToString() + }); + else + { + _logger.LogError($"不支持的上报kafka主题:{topicName}"); + await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + { + ClientId = tcpSessionClient.Id, + ClientIp = tcpSessionClient.IP, + ClientPort = tcpSessionClient.Port, + MessageHexString = messageHexString, + DeviceNo = tB3761?.A?.Code!, + MessageId = Guid.NewGuid().ToString() + }); + } + } } } diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index ee0997d..f1df404 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -21,6 +21,8 @@ using Volo.Abp.Domain.Repositories; using System.Collections.Generic; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.IoTDB.Provider; +using JiShe.CollectBus.Protocol.Dto; +using System.Collections; namespace JiShe.CollectBus.Subscribers { @@ -64,26 +66,44 @@ namespace JiShe.CollectBus.Subscribers //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] public async Task LoginIssuedEvent(List issuedEventMessages) { + bool isAck = false; foreach (var issuedEventMessage in issuedEventMessages) { + var loginEntity = await _messageReceivedLoginEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); + if (loginEntity == null) + { + isAck=false; + break; + } _logger.LogInformation($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); - var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); + loginEntity.AckTime = Clock.Now; loginEntity.IsAck = true; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + if (_tcpService.ClientExists(issuedEventMessage.ClientId)) + await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + isAck = true; + } - return SubscribeAck.Success(); + // TODO:暂时ACK,等后续处理是否放到私信队列中 + return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName, EnableBatch = true)] //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] public async Task HeartbeatIssuedEvent(List issuedEventMessages) { + bool isAck = false; foreach (var issuedEventMessage in issuedEventMessages) { + var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.FirstOrDefaultAsync(a => a.MessageId == issuedEventMessage.MessageId); + if (heartbeatEntity == null) + { + isAck = false; + break; + } _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); - var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); + heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); @@ -92,11 +112,11 @@ namespace JiShe.CollectBus.Subscribers //{ // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); //} - - await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + if(_tcpService.ClientExists(issuedEventMessage.ClientId)) + await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); } - - return SubscribeAck.Success(); + // TODO:暂时ACK,等后续处理是否放到私信队列中 + return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] @@ -158,19 +178,21 @@ namespace JiShe.CollectBus.Subscribers //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] public async Task ReceivedHeartbeatEvent(List receivedHeartbeatMessages) { - foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages) - { - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); - await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); - } - } + //foreach (var receivedHeartbeatMessage in receivedHeartbeatMessages) + //{ + // var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + // if (protocolPlugin == null) + // { + // _logger.LogError("协议不存在!"); + // } + // else + // { + // //await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); + // await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); + // } + //} + await _messageReceivedHeartbeatEventRepository.InsertManyAsync(receivedHeartbeatMessages); + return SubscribeAck.Success(); } @@ -178,21 +200,57 @@ namespace JiShe.CollectBus.Subscribers //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] public async Task ReceivedLoginEvent(List receivedLoginMessages) { - foreach (var receivedLoginMessage in receivedLoginMessages) - { - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - //await protocolPlugin.LoginAsync(receivedLoginMessage); - await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); - } - } - + //foreach (var receivedLoginMessage in receivedLoginMessages) + //{ + //var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + //if (protocolPlugin == null) + //{ + // _logger.LogError("协议不存在!"); + //} + //else + //{ + // //await protocolPlugin.LoginAsync(receivedLoginMessage); + // await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); + //} + + //} + await _messageReceivedLoginEventRepository.InsertManyAsync(receivedLoginMessages); return SubscribeAck.Success(); } + + + [KafkaSubscribe(ProtocolConst.SubscriberAFN01HReceivedEventNameTemp)] + public async Task ReceivedAFN00Event(MessageReceived receivedMessage) + { + + var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + if (protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } + else + { + TB3761? tB3761 = protocolPlugin.Analysis3761(receivedMessage.MessageHexString); + if (tB3761 == null) + { + Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + return SubscribeAck.Success(); + } + if (tB3761.DT == null || tB3761.AFN_FC == null) + { + Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + return SubscribeAck.Success(); + } + //string serverName = $"AFN{tB3761.AFN_FC.AFN}_F{tB3761.DT.Fn}_Analysis"; + //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis"); + + //var data = await analysisStrategy.ExecuteAsync>(tB3761); + + } + + return SubscribeAck.Success(); + } + + } } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index f7bd68e..afe25da 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,7 +16,6 @@ 后端服务 -