From 2ee205d00bb029dc4390b960511c53a2189e79cf Mon Sep 17 00:00:00 2001
From: zenghongyao <873884283@qq.com>
Date: Sun, 27 Apr 2025 15:44:54 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=9A=82=E5=AD=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../AnalysisData/DataStorage.cs | 4 +-
.../T37612012ProtocolPlugin.cs | 2 +-
.../SubscriberAnalysisAppService.cs | 439 ++++++++++++++++++
.../Subscribers/WorkerSubscriberAppService.cs | 2 +
.../MeterReadingTelemetryPacketInfo.cs | 14 +-
.../Pages/Monitor.cshtml | 1 +
web/JiShe.CollectBus.Host/appsettings.json | 2 +-
7 files changed, 459 insertions(+), 5 deletions(-)
diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs
index 6914c3c..9f9183a 100644
--- a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs
+++ b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs
@@ -134,7 +134,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
await _dbProvider.InsertAsync(taskData);
//如果无字段名,则不保存数据
- if (string.IsNullOrWhiteSpace(data.FiledName))
+ if (!string.IsNullOrWhiteSpace(data.FiledName))
{
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(meter);
@@ -233,7 +233,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
await _dbProvider.InsertAsync(taskData);
//如果无字段名,则不保存数据
- if (string.IsNullOrWhiteSpace(item.FiledName))
+ if (!string.IsNullOrWhiteSpace(item.FiledName))
{
_runtimeContext.UseTableSessionPool = false; // 使树模型池
await _dbProvider.InsertAsync(meter);
diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs
index 1fc8d2a..71b80eb 100644
--- a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs
@@ -534,7 +534,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
AFN_FC aFN_FC = new AFN_FC();
try
{
- if (hexStringList.Count == 0)
+ if (hexStringList.Count != 0)
{
BaseHexMessage baseHexMessage = new BaseHexMessage
diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs
index 3fd32fb..d7ec7c1 100644
--- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs
+++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAnalysisAppService.cs
@@ -76,5 +76,444 @@ namespace JiShe.CollectBus.Subscribers
return SubscribeAck.Fail();
}
+ ///
+ /// 解析AFN01H
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN01HReceivedEventNameTemp)]
+ public async Task ReceivedAFN01Event(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+
+ ///
+ /// 解析AFN02H
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN02HReceivedEventNameTemp)]
+ public async Task ReceivedAFN02Event(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+
+ ///
+ /// 解析AFN03H
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN03HReceivedEventNameTemp)]
+ public async Task ReceivedAFN03Event(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+
+ ///
+ /// 解析AFN04H
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN04HReceivedEventNameTemp)]
+ public async Task ReceivedAFN04Event(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+
+ ///
+ /// 解析AFN05H
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN05HReceivedEventNameTemp)]
+ public async Task ReceivedAFN05Event(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+
+ ///
+ /// 解析AFN09H
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN09HReceivedEventNameTemp)]
+ public async Task ReceivedAFN09Event(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+ ///
+ /// 解析AFN0AH
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN0AHReceivedEventNameTemp)]
+ public async Task ReceivedAFN0AEvent(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+ ///
+ /// 解析AFN0BH
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN0BHReceivedEventNameTemp)]
+ public async Task ReceivedAFN0BEvent(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+ ///
+ /// 解析AFN0CH
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN0CHReceivedEventNameTemp)]
+ public async Task ReceivedAFN0CEvent(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+
+ ///
+ /// 解析AFN0DH
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN0DHReceivedEventNameTemp)]
+ public async Task ReceivedAFN0DEvent(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+
+ ///
+ /// 解析AFN0EH
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN0EHReceivedEventNameTemp)]
+ public async Task ReceivedAFN0EEvent(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
+
+
+ ///
+ /// 解析AFN0HH
+ ///
+ ///
+ ///
+ [KafkaSubscribe(ProtocolConst.SubscriberAFN10HReceivedEventNameTemp)]
+ public async Task ReceivedAFN0HEvent(MessageProtocolAnalysis receivedMessage)
+ {
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(receivedMessage.DeviceNo);
+ if (protocolPlugin == null)
+ {
+ _logger.LogError("协议不存在!");
+ }
+ else
+ {
+ if (receivedMessage.Data == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ if (receivedMessage.Data.DT == null || receivedMessage.Data.AFN_FC == null)
+ {
+ Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}");
+ return SubscribeAck.Success();
+ }
+ string serverName = $"AFN{receivedMessage.Data.AFN_FC.AFN}_F{receivedMessage.Data.DT.Fn}_Analysis";
+ //var analysisStrategy = _serviceProvider.GetKeyedService($"AFN0_F1_Analysis");
+
+ //var data = await analysisStrategy.ExecuteAsync>(tB3761);
+ var executor = _serviceProvider.GetRequiredService();
+ await executor.ExecuteAsync(serverName, receivedMessage.Data);
+ }
+
+ return SubscribeAck.Fail();
+ }
}
}
diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
index e34c087..7680d44 100644
--- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
+++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
@@ -117,6 +117,8 @@ namespace JiShe.CollectBus.Subscribers
await _tcpService.SendAsync(receivedMessage.FocusAddress, Convert.FromHexString(receivedMessage.IssuedMessageHexString));
receivedMessage.IsSend = true;
+ receivedMessage.SendNum += 1;
+ receivedMessage.NextSendTime = DateTime.Now.AddMinutes(5);
await _dbProvider.InsertAsync(receivedMessage);
return SubscribeAck.Success();
diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
index e35d245..1b5c954 100644
--- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
+++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
@@ -128,6 +128,18 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
[FIELDColumn]
public bool IsSend { get; set; }
+ ///
+ /// 发送次数
+ ///
+ [FIELDColumn]
+ public int SendNum { get; set; }
+
+ ///
+ /// 下次发送时间
+ ///
+ [FIELDColumn]
+ public DateTime? NextSendTime { get; set; }
+
///
/// 创建时间
///
@@ -174,7 +186,7 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
/// 是否已上报
///
[FIELDColumn]
- public bool IsReceived { get; set; }
+ public bool IsReceived { get; set; }
}
}
diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
index 3a7fbc7..c5bc286 100644
--- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
+++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
@@ -16,6 +16,7 @@
后端服务
+
diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json
index 0717f7f..7bb712a 100644
--- a/web/JiShe.CollectBus.Host/appsettings.json
+++ b/web/JiShe.CollectBus.Host/appsettings.json
@@ -141,7 +141,7 @@
"DefaultIdempotence": true
}
},
- "PlugInFolder": "C:\\Users\\Dai Zan\\Desktop\\Plugins",
+ "PlugInFolder": "",
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus100",
"SystemType": null,