From 310ae97a6b20b8718c20720bdb642b8b1be5af6b Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Wed, 23 Apr 2025 17:49:13 +0800
Subject: [PATCH 1/6] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8A=A5=E6=96=87?=
=?UTF-8?q?=E5=AD=98=E5=82=A8=E8=AE=B0=E5=BD=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Abstracts/ProtocolPlugin.cs | 11 +++++++++++
.../Interfaces/IProtocolPlugin.cs | 6 ++++++
.../BasicScheduledMeterReadingService.cs | 12 +++++-------
.../EnergySystemScheduledMeterReadingService.cs | 4 ++--
.../MeterReadingTelemetryPacketInfo.cs | 8 ++++----
.../JiShe.CollectBus.Common/Helpers/CommonHelper.cs | 5 +++--
6 files changed, 31 insertions(+), 15 deletions(-)
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
index eaa081b..da99681 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
@@ -369,5 +369,16 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
///
public int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexTo4BinZero().IndexOf("1"));
+ ///
+ /// 组装透明转发报文
+ ///
+ ///
+ ///
+ ///
+ public virtual List GenerateAFN10HContent(T entity)
+ {
+ throw new Exception();
+ }
+
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
index 326dad0..074fa48 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
@@ -18,6 +18,12 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
TB3761? Analysis3761(string messageReceived);
+ ///
+ /// 组装透明转发报文
+ ///
+ ///
+ List GenerateAFN10HContent(T entity);
+
//Task LoginAsync(MessageReceivedLogin messageReceived);
//Task HeartbeatAsync(MessageReceivedHeartbeat messageReceived);
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index f50d121..849f570 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -645,7 +645,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
- string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA);
+ string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq.PRSEQ);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
@@ -659,7 +659,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
MeterAddress = ammeterInfo.AmmerterAddress,
AFN = (int)aFN,
Fn = fn,
- //Seq = builderResponse.Seq,
+ Seq = builderResponse.Seq.PRSEQ,
MSA = builderResponse.MSA,
ItemCode = tempItem,
TaskMark = taskMark,
@@ -963,7 +963,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
- string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, watermeter.MeteringCode, builderResponse.MSA);
+ string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq.PRSEQ);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
@@ -977,7 +977,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
MeterAddress = watermeter.MeterAddress,
AFN = (int)aFN,
Fn = fn,
- //Seq = builderResponse.Seq,
+ Seq = builderResponse.Seq.PRSEQ,
MSA = builderResponse.MSA,
ItemCode = tempItem,
TaskMark = taskMark,
@@ -1101,9 +1101,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int pageNumber = 0;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
-
- var ddd = _runtimeContext.UseTableSessionPool;
-
+
do
{
options.PageIndex = pageNumber++;
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 82e9588..27804eb 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -280,7 +280,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
- string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA);
+ string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA,builderResponse.Seq.PRSEQ);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
@@ -294,7 +294,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
MeterAddress = ammeterInfo.AmmerterAddress,
AFN = (int)aFN,
Fn = fn,
- //Seq = builderResponse.Seq,
+ Seq = builderResponse.Seq.PRSEQ,
MSA = builderResponse.MSA,
ItemCode = temCode,
TaskMark = taskMark,
diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
index 07b5369..e39b2bd 100644
--- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
+++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
@@ -104,10 +104,10 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
[FIELDColumn]
public string ItemCode { get; set; }
- /////
- ///// 帧序列域SEQ
- /////
- //public required Seq Seq { get; set; }
+ ///
+ /// 帧序列域 SEQ
+ ///
+ public int Seq { get; set; }
///
/// 地址域A3的主站地址MSA
diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
index 11ba0a6..1f37b10 100644
--- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
+++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
@@ -768,10 +768,11 @@ namespace JiShe.CollectBus.Common.Helpers
///
///
///
+ ///
///
- public static string GetTaskMark(int afn, int fn, int pn, int msa)
+ public static string GetTaskMark(int afn, int fn, int pn, int msa,int seq)
{
- var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}";
+ var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}{msa.ToString().PadLeft(2, '0')}{seq.ToString().PadLeft(2, '0')}";
return makstr;// Convert.ToInt32(makstr) << 32 | msa;
}
From 57021e2b9cbcb24ebd22c8875f91b373f66d6e5a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=99=88=E7=9B=8A?=
Date: Wed, 23 Apr 2025 23:42:35 +0800
Subject: [PATCH 2/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Abstracts/ProtocolPlugin.cs | 7 ++-----
.../Interfaces/IProtocolPlugin.cs | 2 +-
.../StandardProtocolPlugin.cs | 11 ++++++++++-
.../BasicScheduledMeterReadingService.cs | 4 ++--
4 files changed, 15 insertions(+), 9 deletions(-)
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
index da99681..9144c55 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
@@ -373,12 +373,9 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
/// 组装透明转发报文
///
///
- ///
+ /// 设备数据实体
///
- public virtual List GenerateAFN10HContent(T entity)
- {
- throw new Exception();
- }
+ public abstract Task> GenerateAFN10HContent(T entity) where T : class;
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
index 074fa48..8da228e 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
@@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
/// 组装透明转发报文
///
///
- List GenerateAFN10HContent(T entity);
+ Task> GenerateAFN10HContent(T entity) where T : class;
//Task LoginAsync(MessageReceivedLogin messageReceived);
diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
index 27f7466..b2e7a58 100644
--- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
@@ -262,7 +262,16 @@ namespace JiShe.CollectBus.Protocol
-
+ ///
+ /// 组装透明转发报文
+ ///
+ ///
+ /// 设备数据实体
+ ///
+ public override async Task> GenerateAFN10HContent(T entity) where T : class
+ {
+ return await Task.FromResult(new List());
+ }
#region 上行命令
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 849f570..a9b1953 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -952,13 +952,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
else
{
- _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}无效编码。");
+ _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}无效编码。");
continue;
}
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
- _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}未能正确获取报文。");
+ _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
From dec99af6dd6e36a7b4defacb96b42c7e16b57792 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Thu, 24 Apr 2025 17:48:20 +0800
Subject: [PATCH 3/6] =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=B1=A0=E7=9A=84?=
=?UTF-8?q?=E5=BA=94=E7=94=A8=E5=A4=84=E7=90=8601?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Abstracts/ProtocolPlugin.cs | 26 ++-
.../Interfaces/IProtocolPlugin.cs | 7 +-
.../Models/BuildRequest.cs | 28 +++
.../Models/BuildResponse.cs | 38 +++++
.../SendData/Telemetry3761PacketBuilder.cs | 73 ++++----
.../SendData/Telemetry3761PacketRequest.cs | 16 +-
.../SendData/Telemetry3761PacketResponse.cs | 23 +++
.../TestProtocolPlugin.cs | 6 +
.../SendData/Telemetry645PacketBuilder.cs | 106 ++++++++++++
.../SendData/Telemetry645PacketRequest.cs | 23 +++
.../SendData/Telemetry645PacketResponse.cs | 13 ++
.../StandardProtocolPlugin.cs | 94 ++++++++--
.../Plugins/TcpMonitor.cs | 1 +
.../Samples/SampleAppService.cs | 1 +
.../BasicScheduledMeterReadingService.cs | 160 +++++++++---------
...nergySystemScheduledMeterReadingService.cs | 73 +++-----
.../BuildSendDatas/Build645SendData.cs | 1 +
.../BuildSendDatas/TelemetryPacketResponse.cs | 30 ----
18 files changed, 484 insertions(+), 235 deletions(-)
create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildRequest.cs
create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildResponse.cs
rename shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs => protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketBuilder.cs (73%)
rename shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketRequest.cs => protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketRequest.cs (59%)
create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketResponse.cs
create mode 100644 protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs
create mode 100644 protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketRequest.cs
create mode 100644 protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketResponse.cs
delete mode 100644 shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
index 9144c55..9fb645a 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
@@ -1,7 +1,11 @@
-using JiShe.CollectBus.Common.Extensions;
+using JiShe.CollectBus.Common.BuildSendDatas;
+using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.Extensions;
+using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
+using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
@@ -9,7 +13,7 @@ using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{
- public abstract class ProtocolPlugin:IProtocolPlugin
+ public abstract class ProtocolPlugin : IProtocolPlugin
{
//头部字节长度
public const int hearderLen = 6;
@@ -20,9 +24,10 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
private readonly ILogger _logger;
private readonly IRepository _protocolInfoRepository;
+
public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger)
{
- _logger = logger;
+ _logger = logger;
_protocolInfoRepository = serviceProvider.GetRequiredService>();
}
@@ -43,7 +48,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
//await _protocolInfoCache.Get()
}
- public abstract Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? receivedAction = null) where T :class;
+ public abstract Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? receivedAction = null) where T : class;
///
@@ -78,7 +83,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
DA = Analysis_DA(hexStringList),
DT = Analysis_DT(hexStringList)
};
- return tB3761;
+ return tB3761;
}
}
catch (Exception ex)
@@ -302,7 +307,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
return null;
var da1 = baseHexMessage.HexMessageList[0];
var da2 = baseHexMessage.HexMessageList[1];
- DA da = new DA()
+ DA da = new DA()
{
BaseHexMessage = baseHexMessage,
Pn = CalculatePn(da1, da2)
@@ -369,13 +374,18 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
///
public int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexTo4BinZero().IndexOf("1"));
+ #region 下行命令构建
+
///
- /// 组装透明转发报文
+ /// 组装报文
///
///
/// 设备数据实体
+ /// 映射读取执行方法的Code,例如10_1,表示 10H_F1_00000,10H_F1_00001,统一英文下划线分隔
///
- public abstract Task> GenerateAFN10HContent(T entity) where T : class;
+ public abstract Task BuildAsync(BuildRequest request);
+
+ #endregion
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
index 8da228e..cfde2d3 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
@@ -4,6 +4,7 @@ using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Protocol.Contracts.Models;
+using JiShe.CollectBus.Protocol.Contracts.SendData;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
@@ -19,10 +20,12 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
TB3761? Analysis3761(string messageReceived);
///
- /// 组装透明转发报文
+ /// 组装报文
///
+ /// 是否需要转发的扩展协议入参对象
+ /// 映射读取执行方法的Code,例如10_1,表示10H_F1
///
- Task> GenerateAFN10HContent(T entity) where T : class;
+ Task BuildAsync(BuildRequest request);
//Task LoginAsync(MessageReceivedLogin messageReceived);
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildRequest.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildRequest.cs
new file mode 100644
index 0000000..962d533
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildRequest.cs
@@ -0,0 +1,28 @@
+namespace JiShe.CollectBus.Protocol.Contracts.SendData
+{
+ ///
+ /// 报文构建参数
+ ///
+ public class BuildRequest
+ {
+ ///
+ /// 集中器地址
+ ///
+ public required string FocusAddress { get; set; }
+
+ ///
+ /// 抄读计量点,也就是终端电表对应端口
+ ///
+ public int Pn { get; set; }
+
+ ///
+ /// 3761协议构建组合功能码
+ ///
+ public required string ItemCode { get; set; }
+
+ ///
+ /// 集中器转发协议构建组合功能码
+ ///
+ public string SubItemCode { get; set; }
+ }
+}
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildResponse.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildResponse.cs
new file mode 100644
index 0000000..07d235b
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildResponse.cs
@@ -0,0 +1,38 @@
+namespace JiShe.CollectBus.Protocol.Contracts.SendData
+{
+ ///
+ /// 报文构建返回结果
+ ///
+ public class BuildResponse
+ {
+ ///
+ /// 是否成功
+ ///
+ public bool IsSuccess { get; set; } = false;
+
+ ///
+ /// 帧功能域AFN
+ ///
+ public int AFn { get; set;}
+
+ ///
+ /// 帧功能域FN
+ ///
+ public int Fn { get; set; }
+
+ ///
+ /// 帧序列域SEQ
+ ///
+ public int Seq { get; set; }
+
+ ///
+ /// 地址域A3的主站地址MSA
+ ///
+ public int MSA { get; set; }
+
+ ///
+ /// 报文体
+ ///
+ public byte[] Data { get; set; }
+ }
+}
diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketBuilder.cs
similarity index 73%
rename from shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs
rename to protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketBuilder.cs
index 5db0dd7..1aa2a22 100644
--- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketBuilder.cs
@@ -1,51 +1,42 @@
-using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.BuildSendDatas;
+using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
-using System;
-using System.Collections.Generic;
-using System.Data.SqlTypes;
-using System.Linq;
-using System.Net;
using System.Reflection;
-using System.Text;
-using System.Threading.Tasks;
-namespace JiShe.CollectBus.Common.BuildSendDatas
+namespace JiShe.CollectBus.Protocol.Contracts.SendData
{
///
- /// 构建下发报文,只适用与定时抄读
+ /// 构建3761下发报文
///
- public static class TelemetryPacketBuilder
+ public static class Telemetry3761PacketBuilder
{
///
/// 构建报文的委托
- ///
- ///
- ///
- ///
- public delegate TelemetryPacketResponse AFNDelegate(TelemetryPacketRequest request);
+ ///
+ public delegate Telemetry3761PacketResponse T3761Delegate(Telemetry3761PacketRequest request);
///
/// 编码与方法的映射表
///
- public static readonly Dictionary AFNHandlersDictionary = new();
+ public static readonly Dictionary T3761AFNHandlers = new();
- static TelemetryPacketBuilder()
+ static Telemetry3761PacketBuilder()
{
// 初始化时自动注册所有符合命名规则的方法
- var methods = typeof(TelemetryPacketBuilder).GetMethods(BindingFlags.Static | BindingFlags.Public);
+ var methods = typeof(Telemetry3761PacketBuilder).GetMethods(BindingFlags.Static | BindingFlags.Public);
foreach (var method in methods)
{
if (method.Name.StartsWith("AFN") && method.Name.EndsWith("_Fn_Send"))
{
string code = method.Name;
- var delegateInstance = (AFNDelegate)Delegate.CreateDelegate(typeof(AFNDelegate), method);
- AFNHandlersDictionary[code] = delegateInstance;
+ var delegateInstance = (T3761Delegate)Delegate.CreateDelegate(typeof(T3761Delegate), method);
+ T3761AFNHandlers[code] = delegateInstance;
}
}
}
#region AFN_00H 确认∕否认
- public static TelemetryPacketResponse AFN00_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN00_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -64,13 +55,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_01H 复位命令
- public static TelemetryPacketResponse AFN01_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN01_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -89,13 +80,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_02H 链路接口检测
- public static TelemetryPacketResponse AFN02_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN02_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -114,12 +105,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_04H 设置参数
- public static TelemetryPacketResponse AFN04_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN04_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -138,13 +129,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_05H 控制命令
- public static TelemetryPacketResponse AFN05_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN05_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -163,12 +154,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_09H 请求终端配置及信息
- public static TelemetryPacketResponse AFN09_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN09_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -187,13 +178,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_0AH 查询参数
- public static TelemetryPacketResponse AFN0A_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN0A_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -212,12 +203,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_0CH 请求一类数据
- public static TelemetryPacketResponse AFN0C_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN0C_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -236,12 +227,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN_0DH 请求二类数据
- public static TelemetryPacketResponse AFN0D_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN0D_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -260,12 +251,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#endregion
#region AFN10H 数据转发
- public static TelemetryPacketResponse AFN10_Fn_Send(TelemetryPacketRequest request)
+ public static Telemetry3761PacketResponse AFN10_Fn_Send(Telemetry3761PacketRequest request)
{
var reqParameter = new ReqParameter2()
{
@@ -284,7 +275,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
Fn = request.Fn
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, request.DataUnit);
- return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, };
+ return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, };
}
#region SpecialAmmeter 特殊电表转发
diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketRequest.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketRequest.cs
similarity index 59%
rename from shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketRequest.cs
rename to protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketRequest.cs
index d22f923..df12e09 100644
--- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketRequest.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketRequest.cs
@@ -1,20 +1,14 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace JiShe.CollectBus.Common.BuildSendDatas
+namespace JiShe.CollectBus.Protocol.Contracts.SendData
{
///
- /// 报文构建参数
+ /// 构建3761报文参数
///
- public class TelemetryPacketRequest
+ public class Telemetry3761PacketRequest
{
///
/// 集中器地址
///
- public string FocusAddress { get; set; }
+ public required string FocusAddress { get; set; }
///
/// 抄读功能码
@@ -24,7 +18,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
///
/// 抄读计量点,也就是终端电表对应端口
///
- public int Pn { get; set; }
+ public int Pn { get; set; }
///
/// 透明转发单元
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketResponse.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketResponse.cs
new file mode 100644
index 0000000..c27906f
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketResponse.cs
@@ -0,0 +1,23 @@
+namespace JiShe.CollectBus.Protocol.Contracts.SendData
+{
+ ///
+ /// 返回3761报文结果
+ ///
+ public class Telemetry3761PacketResponse
+ {
+ ///
+ /// 帧序列域SEQ
+ ///
+ public int Seq { get; set; }
+
+ ///
+ /// 地址域A3的主站地址MSA
+ ///
+ public int MSA { get; set; }
+
+ ///
+ /// 报文体
+ ///
+ public byte[] Data { get; set; }
+ }
+}
diff --git a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
index e291aa0..a7ab4a3 100644
--- a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
@@ -4,6 +4,7 @@ using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
+using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
@@ -26,5 +27,10 @@ namespace JiShe.CollectBus.Protocol.Test
{
throw new NotImplementedException();
}
+
+ public override Task BuildAsync(BuildRequest request)
+ {
+ throw new NotImplementedException();
+ }
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs
new file mode 100644
index 0000000..870278d
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs
@@ -0,0 +1,106 @@
+using FreeSql;
+using JiShe.CollectBus.Common.BuildSendDatas;
+using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.Extensions;
+using JiShe.CollectBus.Common.Models;
+using System.Reflection;
+
+namespace JiShe.CollectBus.Protocol.SendData
+{
+ ///
+ /// 构建645-2007下发报文
+ ///
+ public static class Telemetry645PacketBuilder
+ {
+ ///
+ /// 构建报文的委托
+ ///
+ ///
+ ///
+ public delegate Telemetry645PacketResponse T645Delegate(Telemetry645PacketRequest request);
+
+ ///
+ /// 编码与方法的映射表
+ ///
+ public static readonly Dictionary T645ControlHandlers = new();
+
+ static Telemetry645PacketBuilder()
+ {
+ // 初始化时自动注册所有符合命名规则的方法
+ var methods = typeof(Telemetry645PacketBuilder).GetMethods(BindingFlags.Static | BindingFlags.Public);
+ foreach (var method in methods)
+ {
+ if (method.Name.StartsWith("C") && method.Name.EndsWith("_Send"))
+ {
+ string code = method.Name;
+ var delegateInstance = (T645Delegate)Delegate.CreateDelegate(typeof(T645Delegate), method);
+ T645ControlHandlers[code] = delegateInstance;
+ }
+ }
+ }
+
+ #region 1CH 跳合闸、报警、保电
+
+ ///
+ /// 1CH 跳合闸
+ ///
+ ///
+ ///
+ public static Telemetry645PacketResponse C1C_01_Send(Telemetry645PacketRequest request)
+ {
+ var itemCodeArr = request.ItemCode.Split('_');
+ var c_data = itemCodeArr[0];
+ var n_data = itemCodeArr[1];
+ string password = request.Password;
+ string pwdLevel = "02";
+
+ if (!string.IsNullOrWhiteSpace(password) && password.Contains("|"))
+ {
+ var sp = password.Split('|');
+ password = sp[0];
+ pwdLevel = sp[1];
+ }
+
+ var strDate = DateTime.Now.AddYears(3).ToString("000012ddMMyy").StrAddSpan();//命令有效截止时间
+
+ var strP = password.StrAddSpan().StrReverseOrder();
+ var strSJY = " " + pwdLevel + " " + strP + " 01 00 00 00 " + n_data + " 00 " + strDate;
+ var dataUnit = strSJY.Replace(" ", "").StringToPairs();
+
+ var dataList = Build645SendData.Build645SendCommand(request.MeterAddress, c_data, dataUnit);
+ return new Telemetry645PacketResponse() { Data = dataList };
+ }
+
+
+ ///
+ /// 1CH 保电
+ ///
+ ///
+ ///
+ public static Telemetry645PacketResponse C1C_02_Send(Telemetry645PacketRequest request)
+ {
+ var itemCodeArr = request.ItemCode.Split('_');
+ var c_data = itemCodeArr[0];
+ var n_data = itemCodeArr[1];
+ string password = request.Password;
+
+ if (!string.IsNullOrWhiteSpace(password) && password.Contains("|"))
+ {
+ var sp = password.Split('|');
+ password = sp[0];
+ }
+
+ var strDate = (n_data + DateTime.Now.AddDays(1).ToString("00000012ddMMyy")).StrAddSpan();
+
+ var strP = password.StrAddSpan().StrReverseOrder();
+
+ var strSJY = " 02 " + strP + " 01 00 00 00 " + strDate;
+
+ var dataUnit = strSJY.Replace(" ", "").StringToPairs();
+
+ var dataList = Build645SendData.Build645SendCommand(request.MeterAddress, c_data, dataUnit);
+ return new Telemetry645PacketResponse() { Data = dataList };
+ }
+ #endregion
+ }
+}
diff --git a/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketRequest.cs b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketRequest.cs
new file mode 100644
index 0000000..fd90322
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketRequest.cs
@@ -0,0 +1,23 @@
+namespace JiShe.CollectBus.Protocol.SendData
+{
+ ///
+ /// 构建645报文参数
+ ///
+ public class Telemetry645PacketRequest
+ {
+ ///
+ /// 表地址
+ ///
+ public required string MeterAddress { get; set; }
+
+ ///
+ /// 密码
+ ///
+ public required string Password { get; set; }
+
+ ///
+ /// 操作码
+ ///
+ public required string ItemCode { get; set; }
+ }
+}
diff --git a/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketResponse.cs b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketResponse.cs
new file mode 100644
index 0000000..7731e0f
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketResponse.cs
@@ -0,0 +1,13 @@
+namespace JiShe.CollectBus.Protocol.SendData
+{
+ ///
+ /// 返回645报文结果
+ ///
+ public class Telemetry645PacketResponse
+ {
+ ///
+ /// 报文体
+ ///
+ public List Data { get; set; }
+ }
+}
diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
index b2e7a58..d94a380 100644
--- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
@@ -1,4 +1,5 @@
-using DeviceDetectorNET.Parser.Device;
+using Azure.Core;
+using DeviceDetectorNET.Parser.Device;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
@@ -13,6 +14,9 @@ using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
+using JiShe.CollectBus.Protocol.Contracts.SendData;
+using JiShe.CollectBus.Protocol.SendData;
+using Mapster;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
@@ -29,17 +33,23 @@ namespace JiShe.CollectBus.Protocol
private readonly IRepository _deviceRepository;
private readonly ITcpService _tcpService;
+
+ public readonly Dictionary T3761AFNHandlers;
+ public readonly Dictionary T645ControlHandlers;
+
///
/// Initializes a new instance of the class.
///
/// The service provider.
- public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger logger, ITcpService tcpService) : base(serviceProvider, logger)
+ public StandardProtocolPlugin(IServiceProvider serviceProvider, ILogger logger, ITcpService tcpService) : base(serviceProvider, logger)
{
_logger = logger;
//_logger = serviceProvider.GetRequiredService>();
_producerService = serviceProvider.GetRequiredService();
_deviceRepository = serviceProvider.GetRequiredService>();
_tcpService = tcpService;
+ T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers;
+ T645ControlHandlers = Telemetry645PacketBuilder.T645ControlHandlers;
}
public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980");
@@ -72,7 +82,7 @@ namespace JiShe.CollectBus.Protocol
await HeartbeatAsync(client, messageReceived, tB3761.A.Code, tB3761.A.A3?.D1_D7, tB3761.SEQ?.PSEQ);
}
}
-
+
}
}
@@ -88,7 +98,7 @@ namespace JiShe.CollectBus.Protocol
///
///
///
- public async Task LoginAsync(ITcpSessionClient client,string messageReceived, string code, int? msa, int? pseq)
+ public async Task LoginAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string oldClientId = $"{client.Id}";
await client.ResetIdAsync(code);
@@ -135,7 +145,7 @@ namespace JiShe.CollectBus.Protocol
AFN = AFN.确认或否认,
FunCode = (int)CFromStationFunCode.链路数据,
PRM = PRM.从动站报文,
- A =code,
+ A = code,
Seq = new Seq()
{
TpV = TpV.附加信息域中无时间标签,
@@ -148,11 +158,12 @@ namespace JiShe.CollectBus.Protocol
Fn = 1
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
- var issuedEventMessage = new IssuedEventMessage
- {
- ClientId = messageReceivedLoginEvent.ClientId,
- DeviceNo = messageReceivedLoginEvent.DeviceNo,
- Message = bytes, Type = IssuedEventType.Login,
+ var issuedEventMessage = new IssuedEventMessage
+ {
+ ClientId = messageReceivedLoginEvent.ClientId,
+ DeviceNo = messageReceivedLoginEvent.DeviceNo,
+ Message = bytes,
+ Type = IssuedEventType.Login,
MessageId = messageReceivedLoginEvent.MessageId
};
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId });
@@ -174,7 +185,7 @@ namespace JiShe.CollectBus.Protocol
///
///
///
- public async Task HeartbeatAsync(ITcpSessionClient client,string messageReceived, string code, int? msa, int? pseq)
+ public async Task HeartbeatAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq)
{
string clientId = code;
@@ -240,7 +251,7 @@ namespace JiShe.CollectBus.Protocol
};
var bytes = Build3761SendData.BuildSendCommandBytes(reqParam);
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId });
-
+
IssuedEventMessage issuedEventMessage = new IssuedEventMessage
{
ClientId = messageReceivedHeartbeatEvent.ClientId,
@@ -263,14 +274,67 @@ namespace JiShe.CollectBus.Protocol
///
- /// 组装透明转发报文
+ /// 组装报文
///
///
/// 设备数据实体
+ /// 映射读取执行方法的Code,例如10_1,表示 10H_F1_00000,10H_F1_00001,统一英文下划线分隔
///
- public override async Task> GenerateAFN10HContent(T entity) where T : class
+ public override async Task BuildAsync(BuildRequest request)
{
- return await Task.FromResult(new List());
+ var itemCodeArr = request.ItemCode.Split('_');
+ var aFNStr = itemCodeArr[0];
+ var aFN = (AFN)aFNStr.HexToDec();
+ var fn = int.Parse(itemCodeArr[1]);
+ Telemetry3761PacketResponse builderResponse = null;
+
+ List dataUnit = new List();
+ //10H_F1_1CH
+ if (aFNStr == "10" && string.IsNullOrWhiteSpace(request.SubItemCode) == false)
+ {
+
+ var subItem = $"C{request.SubItemCode}_Send";
+ Telemetry645PacketResponse t645PacketResponse = null;
+
+ if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(subItem
+ , out var cchandler))
+ {
+ t645PacketResponse = cchandler(new Telemetry645PacketRequest()
+ {
+ MeterAddress = "",
+ Password = "",
+ ItemCode = "",
+ });
+ }
+
+ if (t645PacketResponse != null)
+ {
+ dataUnit = t645PacketResponse.Data;
+ }
+ }
+
+ string afnMethonCode = $"AFN{aFNStr}_Fn_Send";
+ if (T3761AFNHandlers != null && T3761AFNHandlers.TryGetValue(afnMethonCode
+ , out var handler))
+ {
+ builderResponse = handler(new Telemetry3761PacketRequest()
+ {
+ FocusAddress = request.FocusAddress,
+ Fn = fn,
+ Pn = request.Pn,
+ DataUnit = dataUnit,
+ });
+ }
+
+ if (builderResponse == null)
+ {
+ return new BuildResponse();
+ }
+
+ var result = builderResponse.Adapt();
+ result.IsSuccess = true;
+
+ return await Task.FromResult(result);
}
diff --git a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
index 440239b..519a6e7 100644
--- a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
+++ b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs
@@ -11,6 +11,7 @@ using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.Interceptors;
+using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Producer;
diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index d871ecd..c4a7472 100644
--- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -11,6 +11,7 @@ using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
+using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.PrepayModel;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal;
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index a9b1953..6e87ddb 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -7,6 +7,7 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
+using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
@@ -19,6 +20,7 @@ using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -27,6 +29,7 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
+using static System.Runtime.CompilerServices.RuntimeHelpers;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@@ -41,8 +44,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly KafkaOptionConfig _kafkaOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
- private readonly IServiceProvider _serviceProvider;
-
+ private readonly IProtocolService _protocolService;
+
int pageSize = 3000;
public BasicScheduledMeterReadingService(
@@ -51,7 +54,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider,
IoTDBRuntimeContext runtimeContext,
- IServiceProvider serviceProvider,
+ IProtocolService protocolService,
IOptions kafkaOptions)
{
_logger = logger;
@@ -60,7 +63,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_producerService = producerService;
_redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value;
- _serviceProvider = serviceProvider;
+ _protocolService = protocolService;
_runtimeContext.UseTableSessionPool = true;
}
@@ -145,15 +148,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter,
- taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
+ taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
- var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
+ var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
- _dbProvider.BatchInsertAsync(metadata, tempTask);
+ _ = _dbProvider.BatchInsertAsync(metadata, tempTask);
});
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
@@ -163,16 +166,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.WaterMeter,
- taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
+ taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
- var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
+ var tempTask = await WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
- _dbProvider.BatchInsertAsync(metadata, tempTask);
+ _ = _dbProvider.BatchInsertAsync(metadata, tempTask);
});
}
else
@@ -500,11 +503,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 采集频率对应的时间戳
///
- private List AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
+ private async Task> AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
- var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
+ //根据电表型号获取协议插件
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
+ if (protocolPlugin == null)
+ {
+ //_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
+ //return;
+ }
+
+
+
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
@@ -601,43 +613,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
- var itemCodeArr = tempItem.Split('_');
- var aFNStr = itemCodeArr[0];
- var aFN = (AFN)aFNStr.HexToDec();
- var fn = int.Parse(itemCodeArr[1]);
- TelemetryPacketResponse builderResponse = null;
- if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据)
- {
- //实时数据
- builderResponse = TelemetryPacketBuilder.AFN0C_Fn_Send(new TelemetryPacketRequest()
- {
- FocusAddress = ammeterInfo.FocusAddress,
- Fn = fn,
- Pn = ammeterInfo.MeteringCode
- });
- }
- else
- {
- string methonCode = $"AFN{aFNStr}_Fn_Send";
- //特殊表暂不处理
- if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
- , out var handler))
- {
- builderResponse = handler(new TelemetryPacketRequest()
- {
- FocusAddress = ammeterInfo.FocusAddress,
- Fn = fn,
- Pn = ammeterInfo.MeteringCode
- });
- }
- else
- {
- _logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
- continue;
- }
- }
- //TODO:特殊表
+ //var itemCodeArr = tempItem.Split('_');
+ //var aFNStr = itemCodeArr[0];
+ //var aFN = (AFN)aFNStr.HexToDec();
+ //var fn = int.Parse(itemCodeArr[1]);
+ //TODO:特殊表
+ BuildResponse builderResponse = await protocolPlugin.BuildAsync(new BuildRequest()
+ {
+ FocusAddress = ammeterInfo.FocusAddress,
+ Pn = ammeterInfo.MeteringCode,
+ ItemCode = tempItem,
+ });
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
@@ -645,7 +632,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
- string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq.PRSEQ);
+ string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
@@ -657,9 +644,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
- AFN = (int)aFN,
- Fn = fn,
- Seq = builderResponse.Seq.PRSEQ,
+ AFN = builderResponse.AFn,
+ Fn = builderResponse.Fn,
+ Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = tempItem,
TaskMark = taskMark,
@@ -851,10 +838,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 时间格式的任务批次名称
///
- private List WatermeterCreatePublishTaskAction(int timeDensity
+ private async Task> WatermeterCreatePublishTaskAction(int timeDensity
, WatermeterInfo watermeter, int groupIndex, DateTime timestamps)
{
- var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
var currentTime = DateTime.Now;
string typeName;
@@ -912,10 +898,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
tempCodes = new List() { "10_1" };
}
- var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
+ //根据电表型号获取协议插件
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code);
if (protocolPlugin == null)
{
- _logger.LogError("协议不存在!");
+ //_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
+ //return;
}
foreach (var tempItem in tempCodes)
@@ -931,28 +919,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
- var itemCodeArr = tempItem.Split('_');
- var aFNStr = itemCodeArr[0];
- var aFN = (AFN)aFNStr.HexToDec();
- var fn = int.Parse(itemCodeArr[1]);
- TelemetryPacketResponse builderResponse = null;
+ //var itemCodeArr = tempItem.Split('_');
+ //var aFNStr = itemCodeArr[0];
+ //var aFN = (AFN)aFNStr.HexToDec();
+ //var fn = int.Parse(itemCodeArr[1]);
+ //TelemetryPacketResponse builderResponse = null;
- string methonCode = $"AFN{aFNStr}_Fn_Send";
- //特殊表暂不处理
- if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
- , out var handler))
+ //string methonCode = $"AFN{aFNStr}_Fn_Send";
+ ////特殊表暂不处理
+ //if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
+ // , out var handler))
+ //{
+ // builderResponse = handler(new TelemetryPacketRequest()
+ // {
+ // FocusAddress = watermeter.FocusAddress,
+ // Fn = fn,
+ // Pn = watermeter.MeteringCode,
+ // DataUnit = Build188SendData.Build188WaterMeterReadingSendDataUnit(watermeter.Address),
+ // });
+ //}
+ //else
+ //{
+ // _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}无效编码。");
+ // continue;
+ //}
+
+ BuildResponse builderResponse = await protocolPlugin.BuildAsync(new BuildRequest()
{
- builderResponse = handler(new TelemetryPacketRequest()
- {
- FocusAddress = watermeter.FocusAddress,
- Fn = fn,
- Pn = watermeter.MeteringCode,
- DataUnit = Build188SendData.Build188WaterMeterReadingSendDataUnit(watermeter.Address),
- });
- }
- else
+ FocusAddress = watermeter.FocusAddress,
+ Pn = watermeter.MeteringCode,
+ ItemCode = tempItem,
+ });
+ if (builderResponse == null || builderResponse.Data.Length <= 0)
{
- _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}无效编码。");
+ //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
@@ -963,7 +963,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
- string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq.PRSEQ);
+ string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
@@ -975,9 +975,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = watermeter.MeterAddress,
- AFN = (int)aFN,
- Fn = fn,
- Seq = builderResponse.Seq.PRSEQ,
+ AFN = builderResponse.AFn,
+ Fn = builderResponse.Fn,
+ Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = tempItem,
TaskMark = taskMark,
@@ -1101,7 +1101,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int pageNumber = 0;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
-
+
do
{
options.PageIndex = pageNumber++;
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 27804eb..d61a641 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -16,6 +16,7 @@ using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -38,7 +39,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
string serverTagName = string.Empty;
private readonly ILogger _logger;
private readonly IIoTDbProvider _dbProvider;
- private readonly IServiceProvider _serviceProvider;
+ private readonly IProtocolService _protocolService;
public EnergySystemScheduledMeterReadingService(
ILogger logger,
@@ -46,20 +47,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IOptions kafkaOptions,
IoTDBRuntimeContext runtimeContext,
IProducerService producerService,
- IServiceProvider serviceProvider,
+ IProtocolService protocolService,
IRedisDataCacheService redisDataCacheService)
: base(logger,
producerService,
redisDataCacheService,
dbProvider,
runtimeContext,
- serviceProvider,
+ protocolService,
kafkaOptions)
{
serverTagName = kafkaOptions.Value.ServerTagName;
_dbProvider = dbProvider;
_logger = logger;
- _serviceProvider = serviceProvider;
+ _protocolService = protocolService;
}
public sealed override string SystemType => SystemTypeConst.Energy;
@@ -178,14 +179,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
///
public override async Task AmmeterScheduledAutoValveControl()
- {
- var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
-
+ {
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm}";
try
{
+ //获取电表阀控配置
var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr);
if (settingInfos == null || settingInfos.Count <= 0)
@@ -242,45 +242,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102");
continue;
}
-
- var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
+
+ var temCode = "10_01_";
+
+ //根据电表型号获取协议插件
+ var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
- _logger.LogError("协议不存在!");
+ _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
+ return;
}
- var temCode = "10_98";
- var itemCodeArr = temCode.Split('_');
- var aFNStr = itemCodeArr[0];
- var aFN = (AFN)(aFNStr.HexToDec());
- var fn = int.Parse(itemCodeArr[1]);
- TelemetryPacketResponse builderResponse = null;
- string methonCode = $"AFN{aFNStr}_Fn_Send";
- //特殊表暂不处理
- if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
- , out var handler))
+ BuildResponse builderResponse = await protocolPlugin.BuildAsync(new BuildRequest()
{
- builderResponse = handler(new TelemetryPacketRequest()
- {
- FocusAddress = ammeterInfo.FocusAddress,
- Fn = fn,
- Pn = ammeterInfo.MeteringCode,
- DataUnit = Build645SendData.BuildAmmeterValveControlSendDataUnit(ammeterInfo.AmmerterAddress, "", ammeterInfo.Password, tripStateResult),//生成阀控报文
- });
- }
- else
- {
- _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}无效编码,-103");
- continue;
- }
-
- if (builderResponse == null)
- {
- _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}报文构建失败,-104");
- continue;
- }
-
- string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA,builderResponse.Seq.PRSEQ);
+ FocusAddress = ammeterInfo.FocusAddress,
+ Pn = ammeterInfo.MeteringCode,
+ ItemCode = temCode,
+ });
+
+ string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA,builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
@@ -292,9 +272,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
PendingCopyReadTime = currentTime,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
- AFN = (int)aFN,
- Fn = fn,
- Seq = builderResponse.Seq.PRSEQ,
+ AFN = builderResponse.AFn,
+ Fn = builderResponse.Fn,
+ Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = temCode,
TaskMark = taskMark,
@@ -310,7 +290,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (taskList == null || taskList.Count <= 0)
{
- _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-105");
+ _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return;
}
@@ -334,9 +314,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw;
}
-
-
- throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
///
diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs
index 6c4e960..023cb7f 100644
--- a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs
+++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs
@@ -69,6 +69,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
var strP = password.StrAddSpan().StrReverseOrder();
var strSJY = " " + pwdLevel + " " + strP + " 01 00 00 00 " + code + " 00 " + strDate;
var dataUnit = strSJY.Replace(" ", "").StringToPairs();
+
var dataList = Build645SendCommand(ammeterAddress, "1C", dataUnit);
return dataList;
diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs
deleted file mode 100644
index 8cd964a..0000000
--- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs
+++ /dev/null
@@ -1,30 +0,0 @@
-using JiShe.CollectBus.Common.Models;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace JiShe.CollectBus.Common.BuildSendDatas
-{
- ///
- /// 报文构建返回结果
- ///
- public class TelemetryPacketResponse
- {
- ///
- /// 帧序列域SEQ
- ///
- public required Seq Seq { get; set; }
-
- ///
- /// 地址域A3的主站地址MSA
- ///
- public int MSA { get; set; }
-
- ///
- /// 报文体
- ///
- public required byte[] Data { get; set; }
- }
-}
From dbe31c87da439fdc5f913221a25ce17526c2730d Mon Sep 17 00:00:00 2001
From: cli <377476583@qq.com>
Date: Thu, 24 Apr 2025 21:01:01 +0800
Subject: [PATCH 4/6] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E9=87=8D=E8=BD=BD?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Abstracts/ProtocolPlugin.cs | 8 +-
.../JiSheCollectBusTestProtocolModule.cs | 24 ++-
.../TestProtocolPlugin.cs | 2 +-
.../JiSheCollectBusProtocolModule.cs | 10 +-
.../Samples/TestAppService.cs | 11 +-
.../DynamicModule/DynamicModuleManager.cs | 182 ++++++++++++++++++
.../DynamicModule/IDynamicModuleManager.cs | 45 +++++
7 files changed, 274 insertions(+), 8 deletions(-)
create mode 100644 shared/JiShe.CollectBus.Domain.Shared/DynamicModule/DynamicModuleManager.cs
create mode 100644 shared/JiShe.CollectBus.Domain.Shared/DynamicModule/IDynamicModuleManager.cs
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
index a323787..a196661 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs
@@ -1,6 +1,8 @@
using System;
using System.Reflection;
+using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
+using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
@@ -22,10 +24,13 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
private readonly ILogger _logger;
private readonly IRepository _protocolInfoRepository;
+ private readonly IFreeRedisProvider _redisProvider;
+
public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger)
{
_logger = logger;
_protocolInfoRepository = serviceProvider.GetRequiredService>();
+ _redisProvider = serviceProvider.GetRequiredService();
}
@@ -42,7 +47,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name);
await _protocolInfoRepository.InsertAsync(Info);
- //await _protocolInfoCache.Get()
+ await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name);
+ await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info);
}
public abstract Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? receivedAction = null) where T :class;
diff --git a/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs
index 314c3ab..225179a 100644
--- a/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs
@@ -1,5 +1,6 @@
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
+using System;
using Volo.Abp;
using Volo.Abp.Modularity;
@@ -15,9 +16,30 @@ namespace JiShe.CollectBus.Protocol.Test
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
- Console.WriteLine("TestProtocolPlugin Initialization");
+ Console.WriteLine("TestProtocolPlugin OnApplicationInitializationAsync");
var protocol = context.ServiceProvider.GetRequiredKeyedService(nameof(TestProtocolPlugin));
+ RemoveServiceAtRuntime(context.ServiceProvider);
await protocol.LoadAsync();
}
+
+ public override void OnApplicationShutdown(ApplicationShutdownContext context)
+ {
+ Console.WriteLine("TestProtocolPlugin OnApplicationShutdown");
+ base.OnApplicationShutdown(context);
+ }
+
+ public void RemoveServiceAtRuntime(IServiceProvider serviceProvider)
+ {
+ var services = serviceProvider.GetService();
+ services?.AddKeyedSingleton(nameof(TestProtocolPlugin));
+
+
+ //var services = (IServiceCollection)serviceProvider.GetService(typeof(IServiceCollection));
+ //var pluginService = serviceProvider.GetKeyedService(nameof(TestProtocolPlugin));
+ //if (services != null && pluginService!=null)
+ //{
+ // services.Remove(pluginService);
+ //}
+ }
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
index e291aa0..50f5b0c 100644
--- a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs
@@ -20,7 +20,7 @@ namespace JiShe.CollectBus.Protocol.Test
{
}
- public sealed override ProtocolInfo Info => new(nameof(TestProtocolPlugin), "Test", "TCP", "Test协议", "DTS1980-Test");
+ public sealed override ProtocolInfo Info => new(nameof(TestProtocolPlugin), "Test", "TCP", "Test协议", "DTS1980-TEST");
public override Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? receivedAction = null)
{
diff --git a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs
index 8009165..dab94d8 100644
--- a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs
+++ b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs
@@ -11,7 +11,6 @@ using Microsoft.Extensions.Logging;
using Serilog.Core;
using System;
using System.Reflection;
-using TouchSocket.Core;
using Volo.Abp;
using Volo.Abp.Modularity;
@@ -21,7 +20,6 @@ namespace JiShe.CollectBus.Protocol
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
- Console.WriteLine("StandardProtocolPlugin ConfigureServices");
context.Services.AddKeyedSingleton(nameof(StandardProtocolPlugin));
//RegisterProtocolAnalysis(context.Services);
LoadAnalysisStrategy(context.Services);
@@ -29,11 +27,17 @@ namespace JiShe.CollectBus.Protocol
public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context)
{
- Console.WriteLine("StandardProtocolPlugin Initialization");
+ Console.WriteLine("StandardProtocolPlugin OnApplicationInitializationAsync");
var standardProtocol = context.ServiceProvider.GetRequiredKeyedService(nameof(StandardProtocolPlugin));
await standardProtocol.LoadAsync();
}
+ public override void OnApplicationShutdown(ApplicationShutdownContext context)
+ {
+ Console.WriteLine("StandardProtocolPlugin OnApplicationShutdown");
+ base.OnApplicationShutdown(context);
+ }
+
public void LoadAnalysisStrategy(IServiceCollection services)
{
var assembly = Assembly.GetExecutingAssembly();
diff --git a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs
index b054779..f5f2e6b 100644
--- a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs
+++ b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs
@@ -23,11 +23,13 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds;
using Volo.Abp.Domain.Repositories;
using System.Diagnostics;
using System.Linq;
+using System.Reflection;
using Cassandra;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.Protocols;
using TouchSocket.Core;
using Volo.Abp.Modularity;
+using JiShe.CollectBus.DynamicModule;
namespace JiShe.CollectBus.Samples;
@@ -39,18 +41,20 @@ public class TestAppService : CollectBusAppService
private readonly ICassandraProvider _cassandraProvider;
private readonly IProtocolService _protocolService;
private readonly IServiceProvider _serviceProvider;
+ private readonly IDynamicModuleManager _dynamicModuleManager;
public TestAppService(
ILogger logger,
ICassandraRepository messageReceivedCassandraRepository,
- ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider)
+ ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager)
{
_logger = logger;
_messageReceivedCassandraRepository = messageReceivedCassandraRepository;
_cassandraProvider = cassandraProvider;
_protocolService = protocolService;
_serviceProvider = serviceProvider;
+ _dynamicModuleManager = dynamicModuleManager;
}
public async Task AddMessageOfCassandra()
{
@@ -146,6 +150,9 @@ public class TestAppService : CollectBusAppService
// 重新加载插件方法
public async Task ReloadPluginsAsync()
{
- //_serviceProvider.GetService<>()
+ var aa = Assembly.LoadFile(
+ @"D:\Codes\CollectBusV5\JiShe.CollectBus\web\JiShe.CollectBus.Host\bin\Debug\net8.0\Plugins\JiShe.CollectBus.Protocol.Test.dll");
+ var module = aa.GetTypes().First(a=> typeof(IAbpModule).IsAssignableFrom(a));
+ await _dynamicModuleManager.ReinitializeModuleAsync(module);
}
}
diff --git a/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/DynamicModuleManager.cs b/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/DynamicModuleManager.cs
new file mode 100644
index 0000000..6bbc8e7
--- /dev/null
+++ b/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/DynamicModuleManager.cs
@@ -0,0 +1,182 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using System;
+using System.Linq;
+using System.Reflection;
+using System.Threading.Tasks;
+using Volo.Abp;
+using Volo.Abp.DependencyInjection;
+using Volo.Abp.Modularity;
+
+namespace JiShe.CollectBus.DynamicModule
+{
+ ///
+ /// 动态模块管理器的实现
+ ///
+ public class DynamicModuleManager : IDynamicModuleManager, ISingletonDependency
+ {
+ private readonly IModuleContainer _moduleContainer;
+ private readonly IServiceProvider _serviceProvider;
+
+ public DynamicModuleManager(
+ IModuleContainer moduleContainer,
+ IServiceProvider serviceProvider)
+ {
+ _moduleContainer = moduleContainer;
+ _serviceProvider = serviceProvider;
+ }
+
+ public Type[] GetRegisteredModuleTypes()
+ {
+ return _moduleContainer.Modules.Select(m => m.Type).ToArray();
+ }
+
+ public async Task InitializeModuleAsync(Type moduleType)
+ {
+ //if (!typeof(IAbpModule).IsAssignableFrom(moduleType))
+ //{
+ // throw new ArgumentException($"指定的类型 {moduleType.FullName} 不是有效的ABP模块类型", nameof(moduleType));
+ //}
+
+ //var module = (IAbpModule)Activator.CreateInstance(moduleType);
+
+ //// 配置服务
+ //var configureServicesMethod = moduleType.GetMethod("ConfigureServices",
+ // BindingFlags.Public | BindingFlags.Instance | BindingFlags.FlattenHierarchy);
+
+ //if (configureServicesMethod != null)
+ //{
+ // var serviceConfigurationContext = CreateServiceConfigurationContext();
+ // configureServicesMethod.Invoke(module, new object[] { serviceConfigurationContext });
+ //}
+
+ //await CallModuleMethodAsync(module, "OnApplicationInitializationAsync", new object[] { new ApplicationInitializationContext(_serviceProvider) });
+
+
+ }
+
+ public async Task ReinitializeModuleAsync(Type moduleType)
+ {
+ if (!typeof(IAbpModule).IsAssignableFrom(moduleType))
+ {
+ throw new ArgumentException($"指定的类型 {moduleType.FullName} 不是有效的ABP模块类型", nameof(moduleType));
+ }
+
+ var moduleDescriptor = _moduleContainer.Modules.FirstOrDefault(m => m.Type.Name == moduleType.Name);
+ if (moduleDescriptor == null)
+ {
+ throw new InvalidOperationException($"找不到类型为 {moduleType.FullName} 的模块");
+ }
+
+ var module = moduleDescriptor.Instance;
+
+ await CallModuleMethodAsync(module, "OnApplicationShutdown", new object[] { new ApplicationShutdownContext(_serviceProvider) });
+ //var configureServicesMethod = moduleType.GetMethod("ConfigureServices",
+ // BindingFlags.Public | BindingFlags.Instance | BindingFlags.FlattenHierarchy);
+
+ //if (configureServicesMethod != null)
+ //{
+ // var serviceConfigurationContext = CreateServiceConfigurationContext();
+ // configureServicesMethod.Invoke(module, new object[] { serviceConfigurationContext });
+ //}
+ await CallModuleMethodAsync(module, "OnApplicationInitializationAsync", new object[] { new ApplicationInitializationContext(_serviceProvider) });
+ }
+
+ public async Task UnloadModuleAsync(Type moduleType)
+ {
+ if (!typeof(IAbpModule).IsAssignableFrom(moduleType))
+ {
+ throw new ArgumentException($"指定的类型 {moduleType.FullName} 不是有效的ABP模块类型", nameof(moduleType));
+ }
+
+ var moduleDescriptor = _moduleContainer.Modules.FirstOrDefault(m => m.Type.Name == moduleType.Name);
+ if (moduleDescriptor == null)
+ {
+ throw new InvalidOperationException($"找不到类型为 {moduleType.FullName} 的模块");
+ }
+
+ var module = moduleDescriptor.Instance;
+
+ await CallModuleMethodAsync(module, "OnApplicationShutdown", new object[] { new ApplicationShutdownContext(_serviceProvider) });
+ moduleDescriptor.GetType().GetProperty("IsUnloaded", BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic)?
+ .SetValue(moduleDescriptor, true);
+ }
+
+ public bool IsModuleLoaded(Type moduleType)
+ {
+ var moduleDescriptor = _moduleContainer.Modules.FirstOrDefault(m => m.Type == moduleType);
+ if (moduleDescriptor == null)
+ {
+ return false;
+ }
+
+ // 检查模块是否已被标记为卸载
+ var isUnloaded = (bool?)moduleDescriptor.GetType().GetProperty("IsUnloaded", BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic)?
+ .GetValue(moduleDescriptor) ?? false;
+
+ return !isUnloaded;
+ }
+
+ public Type GetModuleTypeByName(string moduleName)
+ {
+ return _moduleContainer.Modules
+ .FirstOrDefault(m => m.Type.Name == moduleName || m.Type.FullName == moduleName)?
+ .Type;
+ }
+
+ private async Task CallModuleMethodAsync(IAbpModule module, string methodName, object[] parameters)
+ {
+ var method = module.GetType().GetMethod(methodName);
+ if (method == null)
+ {
+ return;
+ }
+
+ if (method.ReturnType == typeof(Task))
+ {
+ await (Task)method.Invoke(module, parameters);
+ }
+ else
+ {
+ method.Invoke(module, parameters);
+ }
+ }
+
+ private ServiceConfigurationContext CreateServiceConfigurationContext()
+ {
+ // 反射获取内部构造函数
+ var contextType = typeof(ServiceConfigurationContext);
+ var constructor = contextType.GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).FirstOrDefault();
+
+ if (constructor != null)
+ {
+ // 创建新的服务集合并添加现有服务
+ var services = new ServiceCollection();
+ foreach (var service in ((IServiceCollection)_serviceProvider.GetService()) ?? new ServiceCollection())
+ {
+ services.Add(service);
+ }
+
+ return (ServiceConfigurationContext)constructor.Invoke(new object[] { services });
+ }
+
+ // 如果反射失败,使用备选方案
+ try
+ {
+ var bindingFlags = BindingFlags.NonPublic | BindingFlags.Instance;
+ var services = new ServiceCollection();
+
+ var context = Activator.CreateInstance(contextType, bindingFlags, null, null, null);
+
+ var servicesProperty = contextType.GetProperty("Services");
+ servicesProperty?.SetValue(context, services);
+
+ return (ServiceConfigurationContext)context;
+ }
+ catch
+ {
+ throw new InvalidOperationException("无法创建ServiceConfigurationContext。这可能是由于与ABP框架版本不兼容造成的。");
+ }
+ }
+ }
+}
diff --git a/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/IDynamicModuleManager.cs b/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/IDynamicModuleManager.cs
new file mode 100644
index 0000000..3f87820
--- /dev/null
+++ b/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/IDynamicModuleManager.cs
@@ -0,0 +1,45 @@
+using System;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.DynamicModule
+{
+ ///
+ /// 提供动态管理ABP模块的功能
+ ///
+ public interface IDynamicModuleManager
+ {
+ ///
+ /// 获取已注册的模块类型
+ ///
+ /// 当前应用程序中已注册的所有模块类型
+ Type[] GetRegisteredModuleTypes();
+
+ ///
+ /// 重新初始化指定的模块
+ ///
+ /// 要重新初始化的模块类型
+ /// 表示异步操作的任务
+ Task ReinitializeModuleAsync(Type moduleType);
+
+ ///
+ /// 卸载指定的模块
+ ///
+ /// 要卸载的模块类型
+ /// 表示异步操作的任务
+ Task UnloadModuleAsync(Type moduleType);
+
+ ///
+ /// 检查模块是否已加载
+ ///
+ /// 要检查的模块类型
+ /// 如果模块已加载,则为true;否则为false
+ bool IsModuleLoaded(Type moduleType);
+
+ ///
+ /// 根据模块名称获取模块类型
+ ///
+ /// 模块名称
+ /// 模块类型,如果找不到则为null
+ Type GetModuleTypeByName(string moduleName);
+ }
+}
\ No newline at end of file
From e1853b2655f73df76d4173e63c75123b5d3dfb1a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E9=99=88=E7=9B=8A?=
Date: Thu, 24 Apr 2025 23:39:39 +0800
Subject: [PATCH 5/6] =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=B1=A0=E5=BA=94?=
=?UTF-8?q?=E7=94=A802?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Interfaces/IProtocolPlugin.cs | 2 +-
...uildRequest.cs => ProtocolBuildRequest.cs} | 10 ++--
...ldResponse.cs => ProtocolBuildResponse.cs} | 2 +-
.../Models/SubProtocolBuildRequest.cs | 29 +++++++++
.../SendData/Telemetry645PacketBuilder.cs | 2 +-
.../StandardProtocolPlugin.cs | 39 ++++++------
.../BasicScheduledMeterReadingService.cs | 13 +++-
...nergySystemScheduledMeterReadingService.cs | 17 +++++-
.../Subscribers/WorkerSubscriberAppService.cs | 1 +
.../Consts/T645PacketItemCodeConst.cs | 59 +++++++++++++++++++
10 files changed, 141 insertions(+), 33 deletions(-)
rename protocols/JiShe.CollectBus.Protocol.Contracts/Models/{BuildRequest.cs => ProtocolBuildRequest.cs} (67%)
rename protocols/JiShe.CollectBus.Protocol.Contracts/Models/{BuildResponse.cs => ProtocolBuildResponse.cs} (95%)
create mode 100644 protocols/JiShe.CollectBus.Protocol.Contracts/Models/SubProtocolBuildRequest.cs
create mode 100644 shared/JiShe.CollectBus.Common/Consts/T645PacketItemCodeConst.cs
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
index cfde2d3..9f6ee76 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs
@@ -25,7 +25,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
/// 是否需要转发的扩展协议入参对象
/// 映射读取执行方法的Code,例如10_1,表示10H_F1
///
- Task BuildAsync(BuildRequest request);
+ Task BuildAsync(ProtocolBuildRequest request);
//Task LoginAsync(MessageReceivedLogin messageReceived);
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildRequest.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildRequest.cs
similarity index 67%
rename from protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildRequest.cs
rename to protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildRequest.cs
index 962d533..31b386b 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildRequest.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildRequest.cs
@@ -1,9 +1,11 @@
-namespace JiShe.CollectBus.Protocol.Contracts.SendData
+using JiShe.CollectBus.Protocol.Contracts.Models;
+
+namespace JiShe.CollectBus.Protocol.Contracts.SendData
{
///
/// 报文构建参数
///
- public class BuildRequest
+ public class ProtocolBuildRequest
{
///
/// 集中器地址
@@ -21,8 +23,8 @@
public required string ItemCode { get; set; }
///
- /// 集中器转发协议构建组合功能码
+ /// 集中器转发协议构建构建参数
///
- public string SubItemCode { get; set; }
+ public SubProtocolBuildRequest SubProtocolRequest { get; set; }
}
}
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildResponse.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildResponse.cs
similarity index 95%
rename from protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildResponse.cs
rename to protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildResponse.cs
index 07d235b..09afc47 100644
--- a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/BuildResponse.cs
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildResponse.cs
@@ -3,7 +3,7 @@
///
/// 报文构建返回结果
///
- public class BuildResponse
+ public class ProtocolBuildResponse
{
///
/// 是否成功
diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/SubProtocolBuildRequest.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/SubProtocolBuildRequest.cs
new file mode 100644
index 0000000..c104ea7
--- /dev/null
+++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/SubProtocolBuildRequest.cs
@@ -0,0 +1,29 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Protocol.Contracts.Models
+{
+ ///
+ /// 子协议构建参数
+ ///
+ public class SubProtocolBuildRequest
+ {
+ ///
+ /// 表地址
+ ///
+ public required string MeterAddress { get; set; }
+
+ ///
+ /// 密码
+ ///
+ public required string Password { get; set; }
+
+ ///
+ /// 操作码
+ ///
+ public required string ItemCode { get; set; }
+ }
+}
diff --git a/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs
index 870278d..a7faf67 100644
--- a/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs
+++ b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs
@@ -77,7 +77,7 @@ namespace JiShe.CollectBus.Protocol.SendData
///
///
///
- public static Telemetry645PacketResponse C1C_02_Send(Telemetry645PacketRequest request)
+ public static Telemetry645PacketResponse C1C_03_Send(Telemetry645PacketRequest request)
{
var itemCodeArr = request.ItemCode.Split('_');
var c_data = itemCodeArr[0];
diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
index d94a380..faf3f8c 100644
--- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
+++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs
@@ -1,6 +1,4 @@
-using Azure.Core;
-using DeviceDetectorNET.Parser.Device;
-using JiShe.CollectBus.Common.BuildSendDatas;
+using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
@@ -12,14 +10,12 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
-using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.SendData;
using JiShe.CollectBus.Protocol.SendData;
using Mapster;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
-using Newtonsoft.Json.Linq;
using TouchSocket.Sockets;
using Volo.Abp.Domain.Repositories;
@@ -276,12 +272,14 @@ namespace JiShe.CollectBus.Protocol
///
/// 组装报文
///
- ///
- /// 设备数据实体
- /// 映射读取执行方法的Code,例如10_1,表示 10H_F1_00000,10H_F1_00001,统一英文下划线分隔
+ /// 报文构建参数
///
- public override async Task BuildAsync(BuildRequest request)
+ public override async Task BuildAsync(ProtocolBuildRequest request)
{
+ if (request == null)
+ {
+ throw new Exception($"{nameof(StandardProtocolPlugin)} 报文构建失败,参数为空");
+ }
var itemCodeArr = request.ItemCode.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
@@ -289,21 +287,20 @@ namespace JiShe.CollectBus.Protocol
Telemetry3761PacketResponse builderResponse = null;
List dataUnit = new List();
- //10H_F1_1CH
- if (aFNStr == "10" && string.IsNullOrWhiteSpace(request.SubItemCode) == false)
+ //数据转发场景 10H_F1_1CH
+ if (aFNStr == "10" && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false)
{
-
- var subItem = $"C{request.SubItemCode}_Send";
+ var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send";
Telemetry645PacketResponse t645PacketResponse = null;
- if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(subItem
- , out var cchandler))
+ if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName
+ , out var t645PacketHandler))
{
- t645PacketResponse = cchandler(new Telemetry645PacketRequest()
+ t645PacketResponse = t645PacketHandler(new Telemetry645PacketRequest()
{
- MeterAddress = "",
- Password = "",
- ItemCode = "",
+ MeterAddress = request.SubProtocolRequest.MeterAddress,
+ Password = request.SubProtocolRequest.Password,
+ ItemCode = request.SubProtocolRequest.ItemCode,
});
}
@@ -328,10 +325,10 @@ namespace JiShe.CollectBus.Protocol
if (builderResponse == null)
{
- return new BuildResponse();
+ return new ProtocolBuildResponse();
}
- var result = builderResponse.Adapt();
+ var result = builderResponse.Adapt();
result.IsSuccess = true;
return await Task.FromResult(result);
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 6e87ddb..4a7d74c 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -20,6 +20,7 @@ using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -619,7 +620,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//var fn = int.Parse(itemCodeArr[1]);
//TODO:特殊表
- BuildResponse builderResponse = await protocolPlugin.BuildAsync(new BuildRequest()
+ ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
@@ -898,7 +899,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
tempCodes = new List() { "10_1" };
}
- //根据电表型号获取协议插件
+ //根据表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code);
if (protocolPlugin == null)
{
@@ -944,11 +945,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// continue;
//}
- BuildResponse builderResponse = await protocolPlugin.BuildAsync(new BuildRequest()
+ ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = watermeter.FocusAddress,
Pn = watermeter.MeteringCode,
ItemCode = tempItem,
+ SubProtocolRequest = new SubProtocolBuildRequest()
+ {
+ MeterAddress = watermeter.MeterAddress,
+ Password = watermeter.Password,
+ ItemCode = tempItem,
+ }
});
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index d61a641..e2bae0a 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -16,6 +16,7 @@ using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
+using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.DependencyInjection;
@@ -226,16 +227,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取对应的缓存电表信息
var ammeterInfo = ammeterInfos.First();
bool tripStateResult = false;
+ string subItemCode = string.Empty;
if (settingInfo.TripType.Equals("on"))
{
ammeterInfo.TripState = 0;
tripStateResult = true;
+ subItemCode = T645PacketItemCodeConst.C1C01C;
+ if (ammeterInfo.TypeName != 1)
+ {
+ subItemCode = T645PacketItemCodeConst.C1C01B;
+ }
}
else if (settingInfo.TripType.Equals("off"))
{
ammeterInfo.TripState = 1;
tripStateResult = false;
-
+ subItemCode = T645PacketItemCodeConst.C1C01A;
}
else
{
@@ -253,11 +260,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return;
}
- BuildResponse builderResponse = await protocolPlugin.BuildAsync(new BuildRequest()
+ ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = temCode,
+ SubProtocolRequest = new SubProtocolBuildRequest()
+ {
+ MeterAddress = ammeterInfo.AmmerterAddress,
+ Password = ammeterInfo.Password,
+ ItemCode = subItemCode,
+ }
});
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA,builderResponse.Seq);
diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
index fa90fed..e34c087 100644
--- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
+++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
@@ -82,6 +82,7 @@ namespace JiShe.CollectBus.Subscribers
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)]
public async Task AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage)
{
+ //todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。
_logger.LogInformation("电表自动阀控下行消息消费队列开始处理");
return await SendMessagesAsync(receivedMessage);
}
diff --git a/shared/JiShe.CollectBus.Common/Consts/T645PacketItemCodeConst.cs b/shared/JiShe.CollectBus.Common/Consts/T645PacketItemCodeConst.cs
new file mode 100644
index 0000000..449ef9a
--- /dev/null
+++ b/shared/JiShe.CollectBus.Common/Consts/T645PacketItemCodeConst.cs
@@ -0,0 +1,59 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.Consts
+{
+ ///
+ /// T645报文项编码
+ ///
+ public class T645PacketItemCodeConst
+ {
+ #region 跳合闸、报警、保电
+ ///
+ /// 跳闸
+ ///
+ public const string C1C01A = "1C_1A";
+
+ ///
+ /// 单相合闸
+ ///
+ public const string C1C01B = "1C_1B";
+
+ ///
+ /// 三相合闸
+ ///
+ public const string C1C01C = "1C_1C";
+
+ ///
+ /// 触发报警
+ ///
+ public const string C1C02A = "1C_2A";
+
+ ///
+ /// 报警解除
+ ///
+ public const string C1C02B = "1C_2B";
+
+ ///
+ /// 保电开始
+ ///
+ public const string C1C03A = "1C_3A";
+
+ ///
+ /// 保电结束
+ ///
+ public const string C1C03B = "1C_3B";
+ #endregion
+
+ #region 广播校时
+
+ ///
+ /// 广播校时
+ ///
+ public const string C08 = "08";
+ #endregion
+ }
+}
From e8a1a7d23ece26914ada41a5a6648d31a9b7f113 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Fri, 25 Apr 2025 09:28:20 +0800
Subject: [PATCH 6/6] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9C=8D=E5=8A=A1?=
=?UTF-8?q?=E5=BA=94=E7=94=A8=E9=85=8D=E7=BD=AE=EF=BC=8C=E5=B0=86=E5=85=B6?=
=?UTF-8?q?=E5=8D=95=E7=8B=AC=E7=AE=A1=E7=90=86=E3=80=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Consumer/ConsumerService.cs | 16 ++++++++-----
.../Internal/KafkaOptionConfig.cs | 11 +--------
.../Producer/ProducerService.cs | 14 ++++++-----
.../BasicScheduledMeterReadingService.cs | 24 +++++++++++++------
...nergySystemScheduledMeterReadingService.cs | 19 ++++++++-------
.../Models/ServerApplicationOptions.cs | 23 ++++++++++++++++++
.../CollectBusHostModule.cs | 10 +++++++-
web/JiShe.CollectBus.Host/appsettings.json | 11 +++++----
8 files changed, 86 insertions(+), 42 deletions(-)
create mode 100644 shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs
diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
index d4a62db..7f8c31c 100644
--- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
@@ -1,4 +1,5 @@
using Confluent.Kafka;
+using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
@@ -30,6 +31,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly KafkaOptionConfig _kafkaOptionConfig;
+ private readonly ServerApplicationOptions _applicationOptions;
+
private readonly KafkaPollyPipeline _kafkaPollyPipeline;
///
@@ -37,10 +40,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
///
///
- public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline)
+ public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions applicationOptions)
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
+ _applicationOptions = applicationOptions.Value;
_kafkaPollyPipeline = kafkaPollyPipeline;
}
@@ -67,7 +71,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
var config = new ConsumerConfig
{
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
- GroupId = groupId ?? _kafkaOptionConfig.ServerTagName,
+ GroupId = groupId ?? _applicationOptions.ServerTagName,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记
@@ -161,7 +165,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
}
if (_kafkaOptionConfig.EnableFilter)
{
- var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
@@ -244,7 +248,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
}
if (_kafkaOptionConfig.EnableFilter)
{
- var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
@@ -348,7 +352,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
if (_kafkaOptionConfig.EnableFilter)
{
- var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
@@ -485,7 +489,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
if (_kafkaOptionConfig.EnableFilter)
{
- var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
+ var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs
index 3bea5f1..38c9482 100644
--- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs
+++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs
@@ -8,12 +8,7 @@ public class KafkaOptionConfig
/// kafka地址
///
public string BootstrapServers { get; set; } = null!;
-
- ///
- /// 服务器标识
- ///
- public string ServerTagName { get; set; } = "KafkaFilterKey";
-
+
///
/// kafka主题副本数量
///
@@ -54,8 +49,4 @@ public class KafkaOptionConfig
///
public string? SaslPassword { get; set; }
- ///
- /// 首次采集时间
- ///
- public DateTime? FirstCollectionTime { get; set; }
}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
index 16499b5..50df423 100644
--- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
@@ -5,6 +5,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
+using JiShe.CollectBus.Common;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
@@ -23,18 +24,19 @@ namespace JiShe.CollectBus.Kafka.Producer
private readonly ConcurrentDictionary _producerCache = new();
private class KafkaProducer where TKey : notnull where TValue : class { }
private readonly KafkaOptionConfig _kafkaOptionConfig;
-
+ private readonly ServerApplicationOptions _applicationOptions;
///
/// ProducerService
///
///
///
///
- public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig)
+ public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig, IOptions applicationOptions)
{
_configuration = configuration;
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
+ _applicationOptions = applicationOptions.Value;
}
#region private 私有方法
@@ -119,7 +121,7 @@ namespace JiShe.CollectBus.Kafka.Producer
Key = key,
Value = value,
Headers = new Headers{
- { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
+ { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
};
await producer.ProduceAsync(topic, message);
@@ -141,7 +143,7 @@ namespace JiShe.CollectBus.Kafka.Producer
//Key= _kafkaOptionConfig.ServerTagName,
Value = value,
Headers = new Headers{
- { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
+ { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
};
await producer.ProduceAsync(topic, message);
@@ -165,7 +167,7 @@ namespace JiShe.CollectBus.Kafka.Producer
Key = key,
Value = value,
Headers = new Headers{
- { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
+ { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
};
var typeKey = typeof(KafkaProducer);
@@ -200,7 +202,7 @@ namespace JiShe.CollectBus.Kafka.Producer
//Key = _kafkaOptionConfig.ServerTagName,
Value = value,
Headers = new Headers{
- { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
+ { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
};
var typeKey = typeof(KafkaProducer);
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 4a7d74c..750b938 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -1,4 +1,5 @@
using JiShe.CollectBus.Application.Contracts;
+using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
@@ -25,6 +26,7 @@ using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
+using Microsoft.Identity.Client;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@@ -44,6 +46,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IProducerService _producerService;
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly KafkaOptionConfig _kafkaOptions;
+ private readonly ServerApplicationOptions _applicationOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly IProtocolService _protocolService;
@@ -56,7 +59,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IIoTDbProvider dbProvider,
IoTDBRuntimeContext runtimeContext,
IProtocolService protocolService,
- IOptions kafkaOptions)
+ IOptions kafkaOptions,
+ IOptions applicationOptions)
{
_logger = logger;
_dbProvider = dbProvider;
@@ -64,6 +68,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_producerService = producerService;
_redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value;
+ _applicationOptions = applicationOptions.Value;
_protocolService = protocolService;
_runtimeContext.UseTableSessionPool = true;
@@ -113,6 +118,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
var currentTime = DateTime.Now;
+
+ //定时抄读
foreach (var item in taskInfos)
{
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync(item);
@@ -193,6 +200,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
+
+ //电表定时阀控任务处理。
+ //电表定时广播校时,一天一次。
}
#region 电表采集处理
@@ -279,9 +289,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
- if (_kafkaOptions.FirstCollectionTime.HasValue == false)
+ if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
- _kafkaOptions.FirstCollectionTime = DateTime.Now;
+ _applicationOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
@@ -290,7 +300,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
LastTaskTime = null,
TimeDensity = item.Key,
- NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
+ NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
//todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
@@ -724,9 +734,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
- if (_kafkaOptions.FirstCollectionTime.HasValue == false)
+ if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
- _kafkaOptions.FirstCollectionTime = DateTime.Now;
+ _applicationOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存
@@ -736,7 +746,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
LastTaskTime = null,
TimeDensity = item.Key,
- NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
+ NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
//todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index e2bae0a..fbe667a 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -1,4 +1,5 @@
using JiShe.CollectBus.Application.Contracts;
+using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
@@ -46,6 +47,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ILogger logger,
IIoTDbProvider dbProvider,
IOptions kafkaOptions,
+ IOptions applicationOptions,
IoTDBRuntimeContext runtimeContext,
IProducerService producerService,
IProtocolService protocolService,
@@ -56,9 +58,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
dbProvider,
runtimeContext,
protocolService,
- kafkaOptions)
+ kafkaOptions,
+ applicationOptions)
{
- serverTagName = kafkaOptions.Value.ServerTagName;
+ serverTagName = applicationOptions.Value.ServerTagName;
_dbProvider = dbProvider;
_logger = logger;
_protocolService = protocolService;
@@ -180,7 +183,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
///
public override async Task AmmeterScheduledAutoValveControl()
- {
+ {
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm}";
@@ -199,7 +202,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var ammeterInfos = new List();
List taskList = new List();
var metadata = await _dbProvider.GetMetadata();
-
+
foreach (var settingInfo in settingInfos)
{
bool isGenerate = false;
@@ -249,9 +252,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102");
continue;
}
-
+
var temCode = "10_01_";
-
+
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
@@ -272,8 +275,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ItemCode = subItemCode,
}
});
-
- string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA,builderResponse.Seq);
+
+ string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
diff --git a/shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs b/shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs
new file mode 100644
index 0000000..6ed9e4a
--- /dev/null
+++ b/shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs
@@ -0,0 +1,23 @@
+namespace JiShe.CollectBus.Common
+{
+ ///
+ /// 服务器应用配置
+ ///
+ public class ServerApplicationOptions
+ {
+ ///
+ /// 服务器标识
+ ///
+ public required string ServerTagName { get; set; }
+
+ ///
+ /// 首次采集时间
+ ///
+ public DateTime? FirstCollectionTime { get; set; }
+
+ ///
+ /// 自动验证时间
+ ///
+ public required string AutomaticVerificationTime { get; set;}
+ }
+}
diff --git a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs
index 5068663..e451a1d 100644
--- a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs
+++ b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs
@@ -1,8 +1,10 @@
using Hangfire;
using HealthChecks.UI.Client;
+using JiShe.CollectBus.Common;
using JiShe.CollectBus.Host.Extensions;
using JiShe.CollectBus.Host.HealthChecks;
using JiShe.CollectBus.Host.Swaggers;
+using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.MongoDB;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Swashbuckle.AspNetCore.SwaggerUI;
@@ -28,7 +30,7 @@ namespace JiShe.CollectBus.Host
typeof(AbpSwashbuckleModule),
typeof(AbpTimingModule),
typeof(CollectBusApplicationModule),
- typeof(CollectBusMongoDbModule),
+ typeof(CollectBusMongoDbModule),
typeof(AbpCachingStackExchangeRedisModule),
typeof(AbpBackgroundWorkersHangfireModule)
)]
@@ -50,6 +52,12 @@ namespace JiShe.CollectBus.Host
ConfigureCustom(context, configuration);
ConfigureHealthChecks(context, configuration);
Configure(options => { options.Kind = DateTimeKind.Local; });
+
+ Configure(options =>
+ {
+ configuration.GetSection(nameof(ServerApplicationOptions)).Bind(options);
+ });
+
}
diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json
index e5fe35d..4bd24be 100644
--- a/web/JiShe.CollectBus.Host/appsettings.json
+++ b/web/JiShe.CollectBus.Host/appsettings.json
@@ -79,9 +79,7 @@
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3,
- "NumPartitions": 30,
- "ServerTagName": "JiSheCollectBus100",
- "FirstCollectionTime": "2025-04-22 16:07:00"
+ "NumPartitions": 30
},
"IoTDBOptions": {
"UserName": "root",
@@ -141,5 +139,10 @@
"DefaultIdempotence": true
}
},
- "PlugInFolder": ""
+ "PlugInFolder": "",
+ "ServerApplicationOptions": {
+ "ServerTagName": "JiSheCollectBus100",
+ "FirstCollectionTime": "2025-04-22 16:07:00",
+ "AutomaticVerificationTime": "16:07:00"
+ }
}
\ No newline at end of file