diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index d4a62db..7f8c31c 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -1,4 +1,5 @@ using Confluent.Kafka; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; @@ -30,6 +31,8 @@ namespace JiShe.CollectBus.Kafka.Consumer private readonly KafkaOptionConfig _kafkaOptionConfig; + private readonly ServerApplicationOptions _applicationOptions; + private readonly KafkaPollyPipeline _kafkaPollyPipeline; /// @@ -37,10 +40,11 @@ namespace JiShe.CollectBus.Kafka.Consumer /// /// /// - public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline) + public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions applicationOptions) { _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; + _applicationOptions = applicationOptions.Value; _kafkaPollyPipeline = kafkaPollyPipeline; } @@ -67,7 +71,7 @@ namespace JiShe.CollectBus.Kafka.Consumer var config = new ConsumerConfig { BootstrapServers = _kafkaOptionConfig.BootstrapServers, - GroupId = groupId ?? _kafkaOptionConfig.ServerTagName, + GroupId = groupId ?? _applicationOptions.ServerTagName, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit EnablePartitionEof = true, // 启用分区末尾标记 @@ -161,7 +165,7 @@ namespace JiShe.CollectBus.Kafka.Consumer } if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { @@ -244,7 +248,7 @@ namespace JiShe.CollectBus.Kafka.Consumer } if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { @@ -348,7 +352,7 @@ namespace JiShe.CollectBus.Kafka.Consumer { if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { @@ -485,7 +489,7 @@ namespace JiShe.CollectBus.Kafka.Consumer { if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index 3bea5f1..38c9482 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -8,12 +8,7 @@ public class KafkaOptionConfig /// kafka地址 /// public string BootstrapServers { get; set; } = null!; - - /// - /// 服务器标识 - /// - public string ServerTagName { get; set; } = "KafkaFilterKey"; - + /// /// kafka主题副本数量 /// @@ -54,8 +49,4 @@ public class KafkaOptionConfig /// public string? SaslPassword { get; set; } - /// - /// 首次采集时间 - /// - public DateTime? FirstCollectionTime { get; set; } } \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index 16499b5..50df423 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Confluent.Kafka; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; @@ -23,18 +24,19 @@ namespace JiShe.CollectBus.Kafka.Producer private readonly ConcurrentDictionary _producerCache = new(); private class KafkaProducer where TKey : notnull where TValue : class { } private readonly KafkaOptionConfig _kafkaOptionConfig; - + private readonly ServerApplicationOptions _applicationOptions; /// /// ProducerService /// /// /// /// - public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig) + public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig, IOptions applicationOptions) { _configuration = configuration; _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; + _applicationOptions = applicationOptions.Value; } #region private 私有方法 @@ -119,7 +121,7 @@ namespace JiShe.CollectBus.Kafka.Producer Key = key, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } + { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } }; await producer.ProduceAsync(topic, message); @@ -141,7 +143,7 @@ namespace JiShe.CollectBus.Kafka.Producer //Key= _kafkaOptionConfig.ServerTagName, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } + { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } }; await producer.ProduceAsync(topic, message); @@ -165,7 +167,7 @@ namespace JiShe.CollectBus.Kafka.Producer Key = key, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } + { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } }; var typeKey = typeof(KafkaProducer); @@ -200,7 +202,7 @@ namespace JiShe.CollectBus.Kafka.Producer //Key = _kafkaOptionConfig.ServerTagName, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } + { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } }; var typeKey = typeof(KafkaProducer); diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs index c92b704..c398896 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/ProtocolPlugin.cs @@ -1,9 +1,15 @@ -using System; +using JiShe.CollectBus.Common.BuildSendDatas; +using JiShe.CollectBus.Common.Enums; +using System; using System.Reflection; +using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.Protocol.Contracts.SendData; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; @@ -11,7 +17,7 @@ using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { - public abstract class ProtocolPlugin:IProtocolPlugin + public abstract class ProtocolPlugin : IProtocolPlugin { //头部字节长度 public const int hearderLen = 6; @@ -22,10 +28,13 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts private readonly ILogger _logger; private readonly IRepository _protocolInfoRepository; + private readonly IFreeRedisProvider _redisProvider; + public ProtocolPlugin(IServiceProvider serviceProvider, ILogger logger) { - _logger = logger; + _logger = logger; _protocolInfoRepository = serviceProvider.GetRequiredService>(); + _redisProvider = serviceProvider.GetRequiredService(); } @@ -42,10 +51,11 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts await _protocolInfoRepository.DeleteDirectAsync(a => a.Name == Info.Name); await _protocolInfoRepository.InsertAsync(Info); - //await _protocolInfoCache.Get() + await _redisProvider.Instance.HDelAsync($"{RedisConst.ProtocolKey}", Info.Name); + await _redisProvider.Instance.HSetAsync($"{RedisConst.ProtocolKey}", Info.Name, Info); } - public abstract Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? receivedAction = null) where T :class; + public abstract Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? receivedAction = null) where T : class; /// @@ -80,7 +90,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts DA = Analysis_DA(hexStringList), DT = Analysis_DT(hexStringList) }; - return tB3761; + return tB3761; } } catch (Exception ex) @@ -378,5 +388,18 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts /// public static int CalculateFn(string dt1, string dt2) => dt2.HexToDec() * 8 + (8 - dt1.HexTo4BinZero().IndexOf("1")); + #region 下行命令构建 + + /// + /// 组装报文 + /// + /// + /// 设备数据实体 + /// 映射读取执行方法的Code,例如10_1,表示 10H_F1_00000,10H_F1_00001,统一英文下划线分隔 + /// + public abstract Task BuildAsync(ProtocolBuildRequest request); + + #endregion + } } diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs index 326dad0..9f6ee76 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs @@ -4,6 +4,7 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.Protocol.Contracts.SendData; using TouchSocket.Sockets; namespace JiShe.CollectBus.Protocol.Contracts.Interfaces @@ -18,6 +19,14 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces TB3761? Analysis3761(string messageReceived); + /// + /// 组装报文 + /// + /// 是否需要转发的扩展协议入参对象 + /// 映射读取执行方法的Code,例如10_1,表示10H_F1 + /// + Task BuildAsync(ProtocolBuildRequest request); + //Task LoginAsync(MessageReceivedLogin messageReceived); //Task HeartbeatAsync(MessageReceivedHeartbeat messageReceived); diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildRequest.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildRequest.cs new file mode 100644 index 0000000..31b386b --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildRequest.cs @@ -0,0 +1,30 @@ +using JiShe.CollectBus.Protocol.Contracts.Models; + +namespace JiShe.CollectBus.Protocol.Contracts.SendData +{ + /// + /// 报文构建参数 + /// + public class ProtocolBuildRequest + { + /// + /// 集中器地址 + /// + public required string FocusAddress { get; set; } + + /// + /// 抄读计量点,也就是终端电表对应端口 + /// + public int Pn { get; set; } + + /// + /// 3761协议构建组合功能码 + /// + public required string ItemCode { get; set; } + + /// + /// 集中器转发协议构建构建参数 + /// + public SubProtocolBuildRequest SubProtocolRequest { get; set; } + } +} diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildResponse.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildResponse.cs new file mode 100644 index 0000000..09afc47 --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/ProtocolBuildResponse.cs @@ -0,0 +1,38 @@ +namespace JiShe.CollectBus.Protocol.Contracts.SendData +{ + /// + /// 报文构建返回结果 + /// + public class ProtocolBuildResponse + { + /// + /// 是否成功 + /// + public bool IsSuccess { get; set; } = false; + + /// + /// 帧功能域AFN + /// + public int AFn { get; set;} + + /// + /// 帧功能域FN + /// + public int Fn { get; set; } + + /// + /// 帧序列域SEQ + /// + public int Seq { get; set; } + + /// + /// 地址域A3的主站地址MSA + /// + public int MSA { get; set; } + + /// + /// 报文体 + /// + public byte[] Data { get; set; } + } +} diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Models/SubProtocolBuildRequest.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/SubProtocolBuildRequest.cs new file mode 100644 index 0000000..c104ea7 --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Models/SubProtocolBuildRequest.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Protocol.Contracts.Models +{ + /// + /// 子协议构建参数 + /// + public class SubProtocolBuildRequest + { + /// + /// 表地址 + /// + public required string MeterAddress { get; set; } + + /// + /// 密码 + /// + public required string Password { get; set; } + + /// + /// 操作码 + /// + public required string ItemCode { get; set; } + } +} diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketBuilder.cs similarity index 73% rename from shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs rename to protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketBuilder.cs index 5db0dd7..1aa2a22 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketBuilder.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketBuilder.cs @@ -1,51 +1,42 @@ -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.BuildSendDatas; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Models; -using System; -using System.Collections.Generic; -using System.Data.SqlTypes; -using System.Linq; -using System.Net; using System.Reflection; -using System.Text; -using System.Threading.Tasks; -namespace JiShe.CollectBus.Common.BuildSendDatas +namespace JiShe.CollectBus.Protocol.Contracts.SendData { /// - /// 构建下发报文,只适用与定时抄读 + /// 构建3761下发报文 /// - public static class TelemetryPacketBuilder + public static class Telemetry3761PacketBuilder { /// /// 构建报文的委托 - /// - /// - /// - /// - public delegate TelemetryPacketResponse AFNDelegate(TelemetryPacketRequest request); + /// + public delegate Telemetry3761PacketResponse T3761Delegate(Telemetry3761PacketRequest request); /// /// 编码与方法的映射表 /// - public static readonly Dictionary AFNHandlersDictionary = new(); + public static readonly Dictionary T3761AFNHandlers = new(); - static TelemetryPacketBuilder() + static Telemetry3761PacketBuilder() { // 初始化时自动注册所有符合命名规则的方法 - var methods = typeof(TelemetryPacketBuilder).GetMethods(BindingFlags.Static | BindingFlags.Public); + var methods = typeof(Telemetry3761PacketBuilder).GetMethods(BindingFlags.Static | BindingFlags.Public); foreach (var method in methods) { if (method.Name.StartsWith("AFN") && method.Name.EndsWith("_Fn_Send")) { string code = method.Name; - var delegateInstance = (AFNDelegate)Delegate.CreateDelegate(typeof(AFNDelegate), method); - AFNHandlersDictionary[code] = delegateInstance; + var delegateInstance = (T3761Delegate)Delegate.CreateDelegate(typeof(T3761Delegate), method); + T3761AFNHandlers[code] = delegateInstance; } } } #region AFN_00H 确认∕否认 - public static TelemetryPacketResponse AFN00_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN00_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -64,13 +55,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_01H 复位命令 - public static TelemetryPacketResponse AFN01_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN01_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -89,13 +80,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_02H 链路接口检测 - public static TelemetryPacketResponse AFN02_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN02_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -114,12 +105,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_04H 设置参数 - public static TelemetryPacketResponse AFN04_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN04_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -138,13 +129,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_05H 控制命令 - public static TelemetryPacketResponse AFN05_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN05_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -163,12 +154,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_09H 请求终端配置及信息 - public static TelemetryPacketResponse AFN09_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN09_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -187,13 +178,13 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_0AH 查询参数 - public static TelemetryPacketResponse AFN0A_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN0A_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -212,12 +203,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_0CH 请求一类数据 - public static TelemetryPacketResponse AFN0C_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN0C_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -236,12 +227,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN_0DH 请求二类数据 - public static TelemetryPacketResponse AFN0D_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN0D_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -260,12 +251,12 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #endregion #region AFN10H 数据转发 - public static TelemetryPacketResponse AFN10_Fn_Send(TelemetryPacketRequest request) + public static Telemetry3761PacketResponse AFN10_Fn_Send(Telemetry3761PacketRequest request) { var reqParameter = new ReqParameter2() { @@ -284,7 +275,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas Fn = request.Fn }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParameter, request.DataUnit); - return new TelemetryPacketResponse() { Seq = reqParameter.Seq, Data = bytes, MSA = reqParameter.MSA, }; + return new Telemetry3761PacketResponse() { Seq = reqParameter.Seq.PRSEQ, Data = bytes, MSA = reqParameter.MSA, }; } #region SpecialAmmeter 特殊电表转发 diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketRequest.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketRequest.cs similarity index 59% rename from shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketRequest.cs rename to protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketRequest.cs index d22f923..df12e09 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketRequest.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketRequest.cs @@ -1,20 +1,14 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Common.BuildSendDatas +namespace JiShe.CollectBus.Protocol.Contracts.SendData { /// - /// 报文构建参数 + /// 构建3761报文参数 /// - public class TelemetryPacketRequest + public class Telemetry3761PacketRequest { /// /// 集中器地址 /// - public string FocusAddress { get; set; } + public required string FocusAddress { get; set; } /// /// 抄读功能码 @@ -24,7 +18,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// /// 抄读计量点,也就是终端电表对应端口 /// - public int Pn { get; set; } + public int Pn { get; set; } /// /// 透明转发单元 diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketResponse.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketResponse.cs new file mode 100644 index 0000000..c27906f --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/SendData/Telemetry3761PacketResponse.cs @@ -0,0 +1,23 @@ +namespace JiShe.CollectBus.Protocol.Contracts.SendData +{ + /// + /// 返回3761报文结果 + /// + public class Telemetry3761PacketResponse + { + /// + /// 帧序列域SEQ + /// + public int Seq { get; set; } + + /// + /// 地址域A3的主站地址MSA + /// + public int MSA { get; set; } + + /// + /// 报文体 + /// + public byte[] Data { get; set; } + } +} diff --git a/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs index 314c3ab..225179a 100644 --- a/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs +++ b/protocols/JiShe.CollectBus.Protocol.Test/JiSheCollectBusTestProtocolModule.cs @@ -1,5 +1,6 @@ using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; +using System; using Volo.Abp; using Volo.Abp.Modularity; @@ -15,9 +16,30 @@ namespace JiShe.CollectBus.Protocol.Test public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) { - Console.WriteLine("TestProtocolPlugin Initialization"); + Console.WriteLine("TestProtocolPlugin OnApplicationInitializationAsync"); var protocol = context.ServiceProvider.GetRequiredKeyedService(nameof(TestProtocolPlugin)); + RemoveServiceAtRuntime(context.ServiceProvider); await protocol.LoadAsync(); } + + public override void OnApplicationShutdown(ApplicationShutdownContext context) + { + Console.WriteLine("TestProtocolPlugin OnApplicationShutdown"); + base.OnApplicationShutdown(context); + } + + public void RemoveServiceAtRuntime(IServiceProvider serviceProvider) + { + var services = serviceProvider.GetService(); + services?.AddKeyedSingleton(nameof(TestProtocolPlugin)); + + + //var services = (IServiceCollection)serviceProvider.GetService(typeof(IServiceCollection)); + //var pluginService = serviceProvider.GetKeyedService(nameof(TestProtocolPlugin)); + //if (services != null && pluginService!=null) + //{ + // services.Remove(pluginService); + //} + } } } diff --git a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs index e291aa0..48eb804 100644 --- a/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs @@ -4,6 +4,7 @@ using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.Protocol.Contracts.Abstracts; +using JiShe.CollectBus.Protocol.Contracts.SendData; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; @@ -20,11 +21,16 @@ namespace JiShe.CollectBus.Protocol.Test { } - public sealed override ProtocolInfo Info => new(nameof(TestProtocolPlugin), "Test", "TCP", "Test协议", "DTS1980-Test"); + public sealed override ProtocolInfo Info => new(nameof(TestProtocolPlugin), "Test", "TCP", "Test协议", "DTS1980-TEST"); public override Task AnalyzeAsync(ITcpSessionClient client, string messageReceived, Action? receivedAction = null) { throw new NotImplementedException(); } + + public override Task BuildAsync(ProtocolBuildRequest request) + { + throw new NotImplementedException(); + } } } diff --git a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs index 462921a..6b08805 100644 --- a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs +++ b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs @@ -11,7 +11,6 @@ using Microsoft.Extensions.Logging; using Serilog.Core; using System; using System.Reflection; -using TouchSocket.Core; using Volo.Abp; using Volo.Abp.Modularity; @@ -21,7 +20,6 @@ namespace JiShe.CollectBus.Protocol { public override void ConfigureServices(ServiceConfigurationContext context) { - Console.WriteLine("StandardProtocolPlugin ConfigureServices"); context.Services.AddKeyedSingleton(nameof(StandardProtocolPlugin)); //RegisterProtocolAnalysis(context.Services); LoadAnalysisStrategy(context.Services); @@ -29,11 +27,17 @@ namespace JiShe.CollectBus.Protocol public override async Task OnApplicationInitializationAsync(ApplicationInitializationContext context) { - Console.WriteLine("StandardProtocolPlugin Initialization"); + Console.WriteLine("StandardProtocolPlugin OnApplicationInitializationAsync"); var standardProtocol = context.ServiceProvider.GetRequiredKeyedService(nameof(StandardProtocolPlugin)); await standardProtocol.LoadAsync(); } + public override void OnApplicationShutdown(ApplicationShutdownContext context) + { + Console.WriteLine("StandardProtocolPlugin OnApplicationShutdown"); + base.OnApplicationShutdown(context); + } + public void LoadAnalysisStrategy(IServiceCollection services) { var assembly = Assembly.GetExecutingAssembly(); diff --git a/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs new file mode 100644 index 0000000..a7faf67 --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketBuilder.cs @@ -0,0 +1,106 @@ +using FreeSql; +using JiShe.CollectBus.Common.BuildSendDatas; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Models; +using System.Reflection; + +namespace JiShe.CollectBus.Protocol.SendData +{ + /// + /// 构建645-2007下发报文 + /// + public static class Telemetry645PacketBuilder + { + /// + /// 构建报文的委托 + /// + /// + /// + public delegate Telemetry645PacketResponse T645Delegate(Telemetry645PacketRequest request); + + /// + /// 编码与方法的映射表 + /// + public static readonly Dictionary T645ControlHandlers = new(); + + static Telemetry645PacketBuilder() + { + // 初始化时自动注册所有符合命名规则的方法 + var methods = typeof(Telemetry645PacketBuilder).GetMethods(BindingFlags.Static | BindingFlags.Public); + foreach (var method in methods) + { + if (method.Name.StartsWith("C") && method.Name.EndsWith("_Send")) + { + string code = method.Name; + var delegateInstance = (T645Delegate)Delegate.CreateDelegate(typeof(T645Delegate), method); + T645ControlHandlers[code] = delegateInstance; + } + } + } + + #region 1CH 跳合闸、报警、保电 + + /// + /// 1CH 跳合闸 + /// + /// + /// + public static Telemetry645PacketResponse C1C_01_Send(Telemetry645PacketRequest request) + { + var itemCodeArr = request.ItemCode.Split('_'); + var c_data = itemCodeArr[0]; + var n_data = itemCodeArr[1]; + string password = request.Password; + string pwdLevel = "02"; + + if (!string.IsNullOrWhiteSpace(password) && password.Contains("|")) + { + var sp = password.Split('|'); + password = sp[0]; + pwdLevel = sp[1]; + } + + var strDate = DateTime.Now.AddYears(3).ToString("000012ddMMyy").StrAddSpan();//命令有效截止时间 + + var strP = password.StrAddSpan().StrReverseOrder(); + var strSJY = " " + pwdLevel + " " + strP + " 01 00 00 00 " + n_data + " 00 " + strDate; + var dataUnit = strSJY.Replace(" ", "").StringToPairs(); + + var dataList = Build645SendData.Build645SendCommand(request.MeterAddress, c_data, dataUnit); + return new Telemetry645PacketResponse() { Data = dataList }; + } + + + /// + /// 1CH 保电 + /// + /// + /// + public static Telemetry645PacketResponse C1C_03_Send(Telemetry645PacketRequest request) + { + var itemCodeArr = request.ItemCode.Split('_'); + var c_data = itemCodeArr[0]; + var n_data = itemCodeArr[1]; + string password = request.Password; + + if (!string.IsNullOrWhiteSpace(password) && password.Contains("|")) + { + var sp = password.Split('|'); + password = sp[0]; + } + + var strDate = (n_data + DateTime.Now.AddDays(1).ToString("00000012ddMMyy")).StrAddSpan(); + + var strP = password.StrAddSpan().StrReverseOrder(); + + var strSJY = " 02 " + strP + " 01 00 00 00 " + strDate; + + var dataUnit = strSJY.Replace(" ", "").StringToPairs(); + + var dataList = Build645SendData.Build645SendCommand(request.MeterAddress, c_data, dataUnit); + return new Telemetry645PacketResponse() { Data = dataList }; + } + #endregion + } +} diff --git a/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketRequest.cs b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketRequest.cs new file mode 100644 index 0000000..fd90322 --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketRequest.cs @@ -0,0 +1,23 @@ +namespace JiShe.CollectBus.Protocol.SendData +{ + /// + /// 构建645报文参数 + /// + public class Telemetry645PacketRequest + { + /// + /// 表地址 + /// + public required string MeterAddress { get; set; } + + /// + /// 密码 + /// + public required string Password { get; set; } + + /// + /// 操作码 + /// + public required string ItemCode { get; set; } + } +} diff --git a/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketResponse.cs b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketResponse.cs new file mode 100644 index 0000000..7731e0f --- /dev/null +++ b/protocols/JiShe.CollectBus.Protocol/SendData/Telemetry645PacketResponse.cs @@ -0,0 +1,13 @@ +namespace JiShe.CollectBus.Protocol.SendData +{ + /// + /// 返回645报文结果 + /// + public class Telemetry645PacketResponse + { + /// + /// 报文体 + /// + public List Data { get; set; } + } +} diff --git a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 695434d..4201d04 100644 --- a/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -1,5 +1,4 @@ -using DeviceDetectorNET.Parser.Device; -using JiShe.CollectBus.Common.BuildSendDatas; +using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; @@ -11,11 +10,12 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts.Abstracts; -using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.Protocol.Contracts.SendData; +using JiShe.CollectBus.Protocol.SendData; +using Mapster; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Newtonsoft.Json.Linq; using TouchSocket.Sockets; using Volo.Abp.Domain.Repositories; @@ -30,17 +30,22 @@ namespace JiShe.CollectBus.Protocol private readonly IRepository _deviceRepository; private readonly ITcpService _tcpService; + public readonly Dictionary T3761AFNHandlers; + public readonly Dictionary T645ControlHandlers; + /// /// Initializes a new instance of the class. /// /// The service provider. - public StandardProtocolPlugin(IServiceProvider serviceProvider,ILogger logger, ITcpService tcpService) : base(serviceProvider, logger) + public StandardProtocolPlugin(IServiceProvider serviceProvider, ILogger logger, ITcpService tcpService) : base(serviceProvider, logger) { _logger = logger; //_logger = serviceProvider.GetRequiredService>(); _producerService = serviceProvider.GetRequiredService(); _deviceRepository = serviceProvider.GetRequiredService>(); _tcpService = tcpService; + T3761AFNHandlers = Telemetry3761PacketBuilder.T3761AFNHandlers; + T645ControlHandlers = Telemetry645PacketBuilder.T645ControlHandlers; } public sealed override ProtocolInfo Info => new(nameof(StandardProtocolPlugin), "376.1", "TCP", "376.1协议", "DTS1980"); @@ -144,7 +149,7 @@ namespace JiShe.CollectBus.Protocol /// /// /// - public async Task LoginAsync(ITcpSessionClient client,string messageReceived, string code, int? msa, int? pseq) + public async Task LoginAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq) { string oldClientId = $"{client.Id}"; await client.ResetIdAsync(code); @@ -183,7 +188,7 @@ namespace JiShe.CollectBus.Protocol AFN = AFN.确认或否认, FunCode = (int)CFromStationFunCode.链路数据, PRM = PRM.从动站报文, - A =code, + A = code, Seq = new Seq() { TpV = TpV.附加信息域中无时间标签, @@ -196,11 +201,12 @@ namespace JiShe.CollectBus.Protocol Fn = 1 }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); - var issuedEventMessage = new IssuedEventMessage - { - ClientId = messageReceivedLoginEvent.ClientId, - DeviceNo = messageReceivedLoginEvent.DeviceNo, - Message = bytes, Type = IssuedEventType.Login, + var issuedEventMessage = new IssuedEventMessage + { + ClientId = messageReceivedLoginEvent.ClientId, + DeviceNo = messageReceivedLoginEvent.DeviceNo, + Message = bytes, + Type = IssuedEventType.Login, MessageId = messageReceivedLoginEvent.MessageId }; if (_tcpService.ClientExists(issuedEventMessage.ClientId)) @@ -219,7 +225,7 @@ namespace JiShe.CollectBus.Protocol /// /// /// - public async Task HeartbeatAsync(ITcpSessionClient client,string messageReceived, string code, int? msa, int? pseq) + public async Task HeartbeatAsync(ITcpSessionClient client, string messageReceived, string code, int? msa, int? pseq) { string clientId = code; @@ -297,8 +303,78 @@ namespace JiShe.CollectBus.Protocol _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{Convert.ToHexString(bytes)}"); await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, issuedEventMessage); } + + } + + + /// + /// 组装报文 + /// + /// 报文构建参数 + /// + public override async Task BuildAsync(ProtocolBuildRequest request) + { + if (request == null) + { + throw new Exception($"{nameof(StandardProtocolPlugin)} 报文构建失败,参数为空"); + } + var itemCodeArr = request.ItemCode.Split('_'); + var aFNStr = itemCodeArr[0]; + var aFN = (AFN)aFNStr.HexToDec(); + var fn = int.Parse(itemCodeArr[1]); + Telemetry3761PacketResponse builderResponse = null; + + List dataUnit = new List(); + //数据转发场景 10H_F1_1CH + if (aFNStr == "10" && request.SubProtocolRequest != null && string.IsNullOrWhiteSpace(request.SubProtocolRequest.ItemCode) == false) + { + var t645PacketHandlerName = $"C{request.SubProtocolRequest.ItemCode}_Send"; + Telemetry645PacketResponse t645PacketResponse = null; + + if (T645ControlHandlers != null && T645ControlHandlers.TryGetValue(t645PacketHandlerName + , out var t645PacketHandler)) + { + t645PacketResponse = t645PacketHandler(new Telemetry645PacketRequest() + { + MeterAddress = request.SubProtocolRequest.MeterAddress, + Password = request.SubProtocolRequest.Password, + ItemCode = request.SubProtocolRequest.ItemCode, + }); + } + + if (t645PacketResponse != null) + { + dataUnit = t645PacketResponse.Data; + } + } + + string afnMethonCode = $"AFN{aFNStr}_Fn_Send"; + if (T3761AFNHandlers != null && T3761AFNHandlers.TryGetValue(afnMethonCode + , out var handler)) + { + builderResponse = handler(new Telemetry3761PacketRequest() + { + FocusAddress = request.FocusAddress, + Fn = fn, + Pn = request.Pn, + DataUnit = dataUnit, + }); + } + + if (builderResponse == null) + { + return new ProtocolBuildResponse(); + } + + var result = builderResponse.Adapt(); + result.IsSuccess = true; + + return await Task.FromResult(result); + } + + #region 上行命令 //68 diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 63566bf..0a94586 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -11,6 +11,7 @@ using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Provider; +using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.PrepayModel; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; diff --git a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs index b054779..f5f2e6b 100644 --- a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs @@ -23,11 +23,13 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds; using Volo.Abp.Domain.Repositories; using System.Diagnostics; using System.Linq; +using System.Reflection; using Cassandra; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IotSystems.Protocols; using TouchSocket.Core; using Volo.Abp.Modularity; +using JiShe.CollectBus.DynamicModule; namespace JiShe.CollectBus.Samples; @@ -39,18 +41,20 @@ public class TestAppService : CollectBusAppService private readonly ICassandraProvider _cassandraProvider; private readonly IProtocolService _protocolService; private readonly IServiceProvider _serviceProvider; + private readonly IDynamicModuleManager _dynamicModuleManager; public TestAppService( ILogger logger, ICassandraRepository messageReceivedCassandraRepository, - ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider) + ICassandraProvider cassandraProvider, IProtocolService protocolService,IServiceProvider serviceProvider, IDynamicModuleManager dynamicModuleManager) { _logger = logger; _messageReceivedCassandraRepository = messageReceivedCassandraRepository; _cassandraProvider = cassandraProvider; _protocolService = protocolService; _serviceProvider = serviceProvider; + _dynamicModuleManager = dynamicModuleManager; } public async Task AddMessageOfCassandra() { @@ -146,6 +150,9 @@ public class TestAppService : CollectBusAppService // 重新加载插件方法 public async Task ReloadPluginsAsync() { - //_serviceProvider.GetService<>() + var aa = Assembly.LoadFile( + @"D:\Codes\CollectBusV5\JiShe.CollectBus\web\JiShe.CollectBus.Host\bin\Debug\net8.0\Plugins\JiShe.CollectBus.Protocol.Test.dll"); + var module = aa.GetTypes().First(a=> typeof(IAbpModule).IsAssignableFrom(a)); + await _dynamicModuleManager.ReinitializeModuleAsync(module); } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index f50d121..750b938 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Application.Contracts; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; @@ -7,6 +8,7 @@ using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.EnergySystems.Entities; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; @@ -19,14 +21,18 @@ using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.Protocol.Contracts.SendData; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Microsoft.Identity.Client; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using static System.Runtime.CompilerServices.RuntimeHelpers; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -40,9 +46,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IProducerService _producerService; private readonly IRedisDataCacheService _redisDataCacheService; private readonly KafkaOptionConfig _kafkaOptions; + private readonly ServerApplicationOptions _applicationOptions; private readonly IoTDBRuntimeContext _runtimeContext; - private readonly IServiceProvider _serviceProvider; - + private readonly IProtocolService _protocolService; + int pageSize = 3000; public BasicScheduledMeterReadingService( @@ -51,8 +58,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading IRedisDataCacheService redisDataCacheService, IIoTDbProvider dbProvider, IoTDBRuntimeContext runtimeContext, - IServiceProvider serviceProvider, - IOptions kafkaOptions) + IProtocolService protocolService, + IOptions kafkaOptions, + IOptions applicationOptions) { _logger = logger; _dbProvider = dbProvider; @@ -60,7 +68,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading _producerService = producerService; _redisDataCacheService = redisDataCacheService; _kafkaOptions = kafkaOptions.Value; - _serviceProvider = serviceProvider; + _applicationOptions = applicationOptions.Value; + _protocolService = protocolService; _runtimeContext.UseTableSessionPool = true; } @@ -109,6 +118,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading } var currentTime = DateTime.Now; + + //定时抄读 foreach (var item in taskInfos) { var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync(item); @@ -145,15 +156,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading timeDensity: timeDensity, nextTaskTime: currentTaskTime, meterType: MeterTypeEnum.Ammeter, - taskCreateAction: (timeDensity, data, groupIndex, timestamps) => + taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => { - var tempTask = AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); + var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); if (tempTask == null || tempTask.Count <= 0) { _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _dbProvider.BatchInsertAsync(metadata, tempTask); + _ = _dbProvider.BatchInsertAsync(metadata, tempTask); }); } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) @@ -163,16 +174,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading timeDensity: timeDensity, nextTaskTime: currentTaskTime, meterType: MeterTypeEnum.WaterMeter, - taskCreateAction: (timeDensity, data, groupIndex, timestamps) => + taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => { - var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); + var tempTask = await WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); if (tempTask == null || tempTask.Count <= 0) { _logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } - _dbProvider.BatchInsertAsync(metadata, tempTask); + _ = _dbProvider.BatchInsertAsync(metadata, tempTask); }); } else @@ -189,6 +200,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); } + + //电表定时阀控任务处理。 + //电表定时广播校时,一天一次。 } #region 电表采集处理 @@ -275,9 +289,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); - if (_kafkaOptions.FirstCollectionTime.HasValue == false) + if (_applicationOptions.FirstCollectionTime.HasValue == false) { - _kafkaOptions.FirstCollectionTime = DateTime.Now; + _applicationOptions.FirstCollectionTime = DateTime.Now; } //先处理采集频率任务缓存 foreach (var item in meterInfoGroupByTimeDensity) @@ -286,7 +300,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { LastTaskTime = null, TimeDensity = item.Key, - NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 }; //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 @@ -500,11 +514,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - private List AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) + private async Task> AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) { var currentTime = DateTime.Now; - var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + //根据电表型号获取协议插件 + var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType); + if (protocolPlugin == null) + { + //_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); + //return; + } + + + //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) @@ -601,43 +624,18 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - var itemCodeArr = tempItem.Split('_'); - var aFNStr = itemCodeArr[0]; - var aFN = (AFN)aFNStr.HexToDec(); - var fn = int.Parse(itemCodeArr[1]); - TelemetryPacketResponse builderResponse = null; - if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据) - { - //实时数据 - builderResponse = TelemetryPacketBuilder.AFN0C_Fn_Send(new TelemetryPacketRequest() - { - FocusAddress = ammeterInfo.FocusAddress, - Fn = fn, - Pn = ammeterInfo.MeteringCode - }); - } - else - { - string methonCode = $"AFN{aFNStr}_Fn_Send"; - //特殊表暂不处理 - if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode - , out var handler)) - { - builderResponse = handler(new TelemetryPacketRequest() - { - FocusAddress = ammeterInfo.FocusAddress, - Fn = fn, - Pn = ammeterInfo.MeteringCode - }); - } - else - { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); - continue; - } - } - //TODO:特殊表 + //var itemCodeArr = tempItem.Split('_'); + //var aFNStr = itemCodeArr[0]; + //var aFN = (AFN)aFNStr.HexToDec(); + //var fn = int.Parse(itemCodeArr[1]); + //TODO:特殊表 + ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest() + { + FocusAddress = ammeterInfo.FocusAddress, + Pn = ammeterInfo.MeteringCode, + ItemCode = tempItem, + }); if (builderResponse == null || builderResponse.Data.Length <= 0) { //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); @@ -645,7 +643,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } - string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA); + string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); var meterReadingRecords = new MeterReadingTelemetryPacketInfo() { SystemName = SystemType, @@ -657,9 +655,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading PendingCopyReadTime = timestamps, CreationTime = currentTime, MeterAddress = ammeterInfo.AmmerterAddress, - AFN = (int)aFN, - Fn = fn, - //Seq = builderResponse.Seq, + AFN = builderResponse.AFn, + Fn = builderResponse.Fn, + Seq = builderResponse.Seq, MSA = builderResponse.MSA, ItemCode = tempItem, TaskMark = taskMark, @@ -736,9 +734,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); - if (_kafkaOptions.FirstCollectionTime.HasValue == false) + if (_applicationOptions.FirstCollectionTime.HasValue == false) { - _kafkaOptions.FirstCollectionTime = DateTime.Now; + _applicationOptions.FirstCollectionTime = DateTime.Now; } //先处理采集频率任务缓存 @@ -748,7 +746,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { LastTaskTime = null, TimeDensity = item.Key, - NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 }; //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 @@ -851,10 +849,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 时间格式的任务批次名称 /// - private List WatermeterCreatePublishTaskAction(int timeDensity + private async Task> WatermeterCreatePublishTaskAction(int timeDensity , WatermeterInfo watermeter, int groupIndex, DateTime timestamps) { - var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; var currentTime = DateTime.Now; string typeName; @@ -912,10 +909,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading tempCodes = new List() { "10_1" }; } - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + //根据表型号获取协议插件 + var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code); if (protocolPlugin == null) { - _logger.LogError("协议不存在!"); + //_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); + //return; } foreach (var tempItem in tempCodes) @@ -931,39 +930,57 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - var itemCodeArr = tempItem.Split('_'); - var aFNStr = itemCodeArr[0]; - var aFN = (AFN)aFNStr.HexToDec(); - var fn = int.Parse(itemCodeArr[1]); - TelemetryPacketResponse builderResponse = null; + //var itemCodeArr = tempItem.Split('_'); + //var aFNStr = itemCodeArr[0]; + //var aFN = (AFN)aFNStr.HexToDec(); + //var fn = int.Parse(itemCodeArr[1]); + //TelemetryPacketResponse builderResponse = null; - string methonCode = $"AFN{aFNStr}_Fn_Send"; - //特殊表暂不处理 - if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode - , out var handler)) + //string methonCode = $"AFN{aFNStr}_Fn_Send"; + ////特殊表暂不处理 + //if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode + // , out var handler)) + //{ + // builderResponse = handler(new TelemetryPacketRequest() + // { + // FocusAddress = watermeter.FocusAddress, + // Fn = fn, + // Pn = watermeter.MeteringCode, + // DataUnit = Build188SendData.Build188WaterMeterReadingSendDataUnit(watermeter.Address), + // }); + //} + //else + //{ + // _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}无效编码。"); + // continue; + //} + + ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest() { - builderResponse = handler(new TelemetryPacketRequest() + FocusAddress = watermeter.FocusAddress, + Pn = watermeter.MeteringCode, + ItemCode = tempItem, + SubProtocolRequest = new SubProtocolBuildRequest() { - FocusAddress = watermeter.FocusAddress, - Fn = fn, - Pn = watermeter.MeteringCode, - DataUnit = Build188SendData.Build188WaterMeterReadingSendDataUnit(watermeter.Address), - }); - } - else + MeterAddress = watermeter.MeterAddress, + Password = watermeter.Password, + ItemCode = tempItem, + } + }); + if (builderResponse == null || builderResponse.Data.Length <= 0) { - _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}无效编码。"); + //_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); continue; } if (builderResponse == null || builderResponse.Data.Length <= 0) { - _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的电表{watermeter.Name}采集项{tempItem}未能正确获取报文。"); + _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}未能正确获取报文。"); continue; } - string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, watermeter.MeteringCode, builderResponse.MSA); + string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq); var meterReadingRecords = new MeterReadingTelemetryPacketInfo() { SystemName = SystemType, @@ -975,9 +992,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading PendingCopyReadTime = timestamps, CreationTime = currentTime, MeterAddress = watermeter.MeterAddress, - AFN = (int)aFN, - Fn = fn, - //Seq = builderResponse.Seq, + AFN = builderResponse.AFn, + Fn = builderResponse.Fn, + Seq = builderResponse.Seq, MSA = builderResponse.MSA, ItemCode = tempItem, TaskMark = taskMark, @@ -1102,8 +1119,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading bool hasNext; var stopwatch = Stopwatch.StartNew(); - var ddd = _runtimeContext.UseTableSessionPool; - do { options.PageIndex = pageNumber++; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 82e9588..fbe667a 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Application.Contracts; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; @@ -16,6 +17,8 @@ using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.Protocol.Contracts.SendData; using Microsoft.AspNetCore.Authorization; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -38,28 +41,30 @@ namespace JiShe.CollectBus.ScheduledMeterReading string serverTagName = string.Empty; private readonly ILogger _logger; private readonly IIoTDbProvider _dbProvider; - private readonly IServiceProvider _serviceProvider; + private readonly IProtocolService _protocolService; public EnergySystemScheduledMeterReadingService( ILogger logger, IIoTDbProvider dbProvider, IOptions kafkaOptions, + IOptions applicationOptions, IoTDBRuntimeContext runtimeContext, IProducerService producerService, - IServiceProvider serviceProvider, + IProtocolService protocolService, IRedisDataCacheService redisDataCacheService) : base(logger, producerService, redisDataCacheService, dbProvider, runtimeContext, - serviceProvider, - kafkaOptions) + protocolService, + kafkaOptions, + applicationOptions) { - serverTagName = kafkaOptions.Value.ServerTagName; + serverTagName = applicationOptions.Value.ServerTagName; _dbProvider = dbProvider; _logger = logger; - _serviceProvider = serviceProvider; + _protocolService = protocolService; } public sealed override string SystemType => SystemTypeConst.Energy; @@ -179,13 +184,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public override async Task AmmeterScheduledAutoValveControl() { - var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; - var currentTime = DateTime.Now; string currentTimeStr = $"{currentTime:HH:mm}"; try { + //获取电表阀控配置 var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr); if (settingInfos == null || settingInfos.Count <= 0) @@ -198,7 +202,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var ammeterInfos = new List(); List taskList = new List(); var metadata = await _dbProvider.GetMetadata(); - + foreach (var settingInfo in settingInfos) { bool isGenerate = false; @@ -226,16 +230,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取对应的缓存电表信息 var ammeterInfo = ammeterInfos.First(); bool tripStateResult = false; + string subItemCode = string.Empty; if (settingInfo.TripType.Equals("on")) { ammeterInfo.TripState = 0; tripStateResult = true; + subItemCode = T645PacketItemCodeConst.C1C01C; + if (ammeterInfo.TypeName != 1) + { + subItemCode = T645PacketItemCodeConst.C1C01B; + } } else if (settingInfo.TripType.Equals("off")) { ammeterInfo.TripState = 1; tripStateResult = false; - + subItemCode = T645PacketItemCodeConst.C1C01A; } else { @@ -243,44 +253,30 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); + var temCode = "10_01_"; + + //根据电表型号获取协议插件 + var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType); if (protocolPlugin == null) { - _logger.LogError("协议不存在!"); + _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); + return; } - var temCode = "10_98"; - var itemCodeArr = temCode.Split('_'); - var aFNStr = itemCodeArr[0]; - var aFN = (AFN)(aFNStr.HexToDec()); - var fn = int.Parse(itemCodeArr[1]); - TelemetryPacketResponse builderResponse = null; - string methonCode = $"AFN{aFNStr}_Fn_Send"; - //特殊表暂不处理 - if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode - , out var handler)) + ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest() { - builderResponse = handler(new TelemetryPacketRequest() + FocusAddress = ammeterInfo.FocusAddress, + Pn = ammeterInfo.MeteringCode, + ItemCode = temCode, + SubProtocolRequest = new SubProtocolBuildRequest() { - FocusAddress = ammeterInfo.FocusAddress, - Fn = fn, - Pn = ammeterInfo.MeteringCode, - DataUnit = Build645SendData.BuildAmmeterValveControlSendDataUnit(ammeterInfo.AmmerterAddress, "", ammeterInfo.Password, tripStateResult),//生成阀控报文 - }); - } - else - { - _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}无效编码,-103"); - continue; - } + MeterAddress = ammeterInfo.AmmerterAddress, + Password = ammeterInfo.Password, + ItemCode = subItemCode, + } + }); - if (builderResponse == null) - { - _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{temCode}报文构建失败,-104"); - continue; - } - - string taskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode, builderResponse.MSA); + string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); var meterReadingRecords = new MeterReadingTelemetryPacketInfo() { SystemName = SystemType, @@ -292,9 +288,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading PendingCopyReadTime = currentTime, CreationTime = currentTime, MeterAddress = ammeterInfo.AmmerterAddress, - AFN = (int)aFN, - Fn = fn, - //Seq = builderResponse.Seq, + AFN = builderResponse.AFn, + Fn = builderResponse.Fn, + Seq = builderResponse.Seq, MSA = builderResponse.MSA, ItemCode = temCode, TaskMark = taskMark, @@ -310,7 +306,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (taskList == null || taskList.Count <= 0) { - _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-105"); + _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106"); return; } @@ -334,9 +330,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading throw; } - - - throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现"); } /// diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index fa90fed..e34c087 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -82,6 +82,7 @@ namespace JiShe.CollectBus.Subscribers [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)] public async Task AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage) { + //todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。 _logger.LogInformation("电表自动阀控下行消息消费队列开始处理"); return await SendMessagesAsync(receivedMessage); } diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs index 07b5369..e39b2bd 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -104,10 +104,10 @@ namespace JiShe.CollectBus.IotSystems.MeterReadingRecords [FIELDColumn] public string ItemCode { get; set; } - ///// - ///// 帧序列域SEQ - ///// - //public required Seq Seq { get; set; } + /// + /// 帧序列域 SEQ + /// + public int Seq { get; set; } /// /// 地址域A3的主站地址MSA diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs index 6c4e960..023cb7f 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build645SendData.cs @@ -69,6 +69,7 @@ namespace JiShe.CollectBus.Common.BuildSendDatas var strP = password.StrAddSpan().StrReverseOrder(); var strSJY = " " + pwdLevel + " " + strP + " 01 00 00 00 " + code + " 00 " + strDate; var dataUnit = strSJY.Replace(" ", "").StringToPairs(); + var dataList = Build645SendCommand(ammeterAddress, "1C", dataUnit); return dataList; diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs deleted file mode 100644 index 8cd964a..0000000 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/TelemetryPacketResponse.cs +++ /dev/null @@ -1,30 +0,0 @@ -using JiShe.CollectBus.Common.Models; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Common.BuildSendDatas -{ - /// - /// 报文构建返回结果 - /// - public class TelemetryPacketResponse - { - /// - /// 帧序列域SEQ - /// - public required Seq Seq { get; set; } - - /// - /// 地址域A3的主站地址MSA - /// - public int MSA { get; set; } - - /// - /// 报文体 - /// - public required byte[] Data { get; set; } - } -} diff --git a/shared/JiShe.CollectBus.Common/Consts/T645PacketItemCodeConst.cs b/shared/JiShe.CollectBus.Common/Consts/T645PacketItemCodeConst.cs new file mode 100644 index 0000000..449ef9a --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Consts/T645PacketItemCodeConst.cs @@ -0,0 +1,59 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Consts +{ + /// + /// T645报文项编码 + /// + public class T645PacketItemCodeConst + { + #region 跳合闸、报警、保电 + /// + /// 跳闸 + /// + public const string C1C01A = "1C_1A"; + + /// + /// 单相合闸 + /// + public const string C1C01B = "1C_1B"; + + /// + /// 三相合闸 + /// + public const string C1C01C = "1C_1C"; + + /// + /// 触发报警 + /// + public const string C1C02A = "1C_2A"; + + /// + /// 报警解除 + /// + public const string C1C02B = "1C_2B"; + + /// + /// 保电开始 + /// + public const string C1C03A = "1C_3A"; + + /// + /// 保电结束 + /// + public const string C1C03B = "1C_3B"; + #endregion + + #region 广播校时 + + /// + /// 广播校时 + /// + public const string C08 = "08"; + #endregion + } +} diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 11ba0a6..1f37b10 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -768,10 +768,11 @@ namespace JiShe.CollectBus.Common.Helpers /// /// /// + /// /// - public static string GetTaskMark(int afn, int fn, int pn, int msa) + public static string GetTaskMark(int afn, int fn, int pn, int msa,int seq) { - var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}"; + var makstr = $"{afn.ToString().PadLeft(2, '0')}{fn.ToString().PadLeft(2, '0')}{pn.ToString().PadLeft(2, '0')}{msa.ToString().PadLeft(2, '0')}{seq.ToString().PadLeft(2, '0')}"; return makstr;// Convert.ToInt32(makstr) << 32 | msa; } diff --git a/shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs b/shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs new file mode 100644 index 0000000..6ed9e4a --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs @@ -0,0 +1,23 @@ +namespace JiShe.CollectBus.Common +{ + /// + /// 服务器应用配置 + /// + public class ServerApplicationOptions + { + /// + /// 服务器标识 + /// + public required string ServerTagName { get; set; } + + /// + /// 首次采集时间 + /// + public DateTime? FirstCollectionTime { get; set; } + + /// + /// 自动验证时间 + /// + public required string AutomaticVerificationTime { get; set;} + } +} diff --git a/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/DynamicModuleManager.cs b/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/DynamicModuleManager.cs new file mode 100644 index 0000000..6bbc8e7 --- /dev/null +++ b/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/DynamicModuleManager.cs @@ -0,0 +1,182 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Linq; +using System.Reflection; +using System.Threading.Tasks; +using Volo.Abp; +using Volo.Abp.DependencyInjection; +using Volo.Abp.Modularity; + +namespace JiShe.CollectBus.DynamicModule +{ + /// + /// 动态模块管理器的实现 + /// + public class DynamicModuleManager : IDynamicModuleManager, ISingletonDependency + { + private readonly IModuleContainer _moduleContainer; + private readonly IServiceProvider _serviceProvider; + + public DynamicModuleManager( + IModuleContainer moduleContainer, + IServiceProvider serviceProvider) + { + _moduleContainer = moduleContainer; + _serviceProvider = serviceProvider; + } + + public Type[] GetRegisteredModuleTypes() + { + return _moduleContainer.Modules.Select(m => m.Type).ToArray(); + } + + public async Task InitializeModuleAsync(Type moduleType) + { + //if (!typeof(IAbpModule).IsAssignableFrom(moduleType)) + //{ + // throw new ArgumentException($"指定的类型 {moduleType.FullName} 不是有效的ABP模块类型", nameof(moduleType)); + //} + + //var module = (IAbpModule)Activator.CreateInstance(moduleType); + + //// 配置服务 + //var configureServicesMethod = moduleType.GetMethod("ConfigureServices", + // BindingFlags.Public | BindingFlags.Instance | BindingFlags.FlattenHierarchy); + + //if (configureServicesMethod != null) + //{ + // var serviceConfigurationContext = CreateServiceConfigurationContext(); + // configureServicesMethod.Invoke(module, new object[] { serviceConfigurationContext }); + //} + + //await CallModuleMethodAsync(module, "OnApplicationInitializationAsync", new object[] { new ApplicationInitializationContext(_serviceProvider) }); + + + } + + public async Task ReinitializeModuleAsync(Type moduleType) + { + if (!typeof(IAbpModule).IsAssignableFrom(moduleType)) + { + throw new ArgumentException($"指定的类型 {moduleType.FullName} 不是有效的ABP模块类型", nameof(moduleType)); + } + + var moduleDescriptor = _moduleContainer.Modules.FirstOrDefault(m => m.Type.Name == moduleType.Name); + if (moduleDescriptor == null) + { + throw new InvalidOperationException($"找不到类型为 {moduleType.FullName} 的模块"); + } + + var module = moduleDescriptor.Instance; + + await CallModuleMethodAsync(module, "OnApplicationShutdown", new object[] { new ApplicationShutdownContext(_serviceProvider) }); + //var configureServicesMethod = moduleType.GetMethod("ConfigureServices", + // BindingFlags.Public | BindingFlags.Instance | BindingFlags.FlattenHierarchy); + + //if (configureServicesMethod != null) + //{ + // var serviceConfigurationContext = CreateServiceConfigurationContext(); + // configureServicesMethod.Invoke(module, new object[] { serviceConfigurationContext }); + //} + await CallModuleMethodAsync(module, "OnApplicationInitializationAsync", new object[] { new ApplicationInitializationContext(_serviceProvider) }); + } + + public async Task UnloadModuleAsync(Type moduleType) + { + if (!typeof(IAbpModule).IsAssignableFrom(moduleType)) + { + throw new ArgumentException($"指定的类型 {moduleType.FullName} 不是有效的ABP模块类型", nameof(moduleType)); + } + + var moduleDescriptor = _moduleContainer.Modules.FirstOrDefault(m => m.Type.Name == moduleType.Name); + if (moduleDescriptor == null) + { + throw new InvalidOperationException($"找不到类型为 {moduleType.FullName} 的模块"); + } + + var module = moduleDescriptor.Instance; + + await CallModuleMethodAsync(module, "OnApplicationShutdown", new object[] { new ApplicationShutdownContext(_serviceProvider) }); + moduleDescriptor.GetType().GetProperty("IsUnloaded", BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic)? + .SetValue(moduleDescriptor, true); + } + + public bool IsModuleLoaded(Type moduleType) + { + var moduleDescriptor = _moduleContainer.Modules.FirstOrDefault(m => m.Type == moduleType); + if (moduleDescriptor == null) + { + return false; + } + + // 检查模块是否已被标记为卸载 + var isUnloaded = (bool?)moduleDescriptor.GetType().GetProperty("IsUnloaded", BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic)? + .GetValue(moduleDescriptor) ?? false; + + return !isUnloaded; + } + + public Type GetModuleTypeByName(string moduleName) + { + return _moduleContainer.Modules + .FirstOrDefault(m => m.Type.Name == moduleName || m.Type.FullName == moduleName)? + .Type; + } + + private async Task CallModuleMethodAsync(IAbpModule module, string methodName, object[] parameters) + { + var method = module.GetType().GetMethod(methodName); + if (method == null) + { + return; + } + + if (method.ReturnType == typeof(Task)) + { + await (Task)method.Invoke(module, parameters); + } + else + { + method.Invoke(module, parameters); + } + } + + private ServiceConfigurationContext CreateServiceConfigurationContext() + { + // 反射获取内部构造函数 + var contextType = typeof(ServiceConfigurationContext); + var constructor = contextType.GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).FirstOrDefault(); + + if (constructor != null) + { + // 创建新的服务集合并添加现有服务 + var services = new ServiceCollection(); + foreach (var service in ((IServiceCollection)_serviceProvider.GetService()) ?? new ServiceCollection()) + { + services.Add(service); + } + + return (ServiceConfigurationContext)constructor.Invoke(new object[] { services }); + } + + // 如果反射失败,使用备选方案 + try + { + var bindingFlags = BindingFlags.NonPublic | BindingFlags.Instance; + var services = new ServiceCollection(); + + var context = Activator.CreateInstance(contextType, bindingFlags, null, null, null); + + var servicesProperty = contextType.GetProperty("Services"); + servicesProperty?.SetValue(context, services); + + return (ServiceConfigurationContext)context; + } + catch + { + throw new InvalidOperationException("无法创建ServiceConfigurationContext。这可能是由于与ABP框架版本不兼容造成的。"); + } + } + } +} diff --git a/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/IDynamicModuleManager.cs b/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/IDynamicModuleManager.cs new file mode 100644 index 0000000..3f87820 --- /dev/null +++ b/shared/JiShe.CollectBus.Domain.Shared/DynamicModule/IDynamicModuleManager.cs @@ -0,0 +1,45 @@ +using System; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.DynamicModule +{ + /// + /// 提供动态管理ABP模块的功能 + /// + public interface IDynamicModuleManager + { + /// + /// 获取已注册的模块类型 + /// + /// 当前应用程序中已注册的所有模块类型 + Type[] GetRegisteredModuleTypes(); + + /// + /// 重新初始化指定的模块 + /// + /// 要重新初始化的模块类型 + /// 表示异步操作的任务 + Task ReinitializeModuleAsync(Type moduleType); + + /// + /// 卸载指定的模块 + /// + /// 要卸载的模块类型 + /// 表示异步操作的任务 + Task UnloadModuleAsync(Type moduleType); + + /// + /// 检查模块是否已加载 + /// + /// 要检查的模块类型 + /// 如果模块已加载,则为true;否则为false + bool IsModuleLoaded(Type moduleType); + + /// + /// 根据模块名称获取模块类型 + /// + /// 模块名称 + /// 模块类型,如果找不到则为null + Type GetModuleTypeByName(string moduleName); + } +} \ No newline at end of file diff --git a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs index 5068663..e451a1d 100644 --- a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -1,8 +1,10 @@ using Hangfire; using HealthChecks.UI.Client; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Host.Extensions; using JiShe.CollectBus.Host.HealthChecks; using JiShe.CollectBus.Host.Swaggers; +using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.MongoDB; using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Swashbuckle.AspNetCore.SwaggerUI; @@ -28,7 +30,7 @@ namespace JiShe.CollectBus.Host typeof(AbpSwashbuckleModule), typeof(AbpTimingModule), typeof(CollectBusApplicationModule), - typeof(CollectBusMongoDbModule), + typeof(CollectBusMongoDbModule), typeof(AbpCachingStackExchangeRedisModule), typeof(AbpBackgroundWorkersHangfireModule) )] @@ -50,6 +52,12 @@ namespace JiShe.CollectBus.Host ConfigureCustom(context, configuration); ConfigureHealthChecks(context, configuration); Configure(options => { options.Kind = DateTimeKind.Local; }); + + Configure(options => + { + configuration.GetSection(nameof(ServerApplicationOptions)).Bind(options); + }); + } diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index e5fe35d..4bd24be 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -79,9 +79,7 @@ "SaslUserName": "lixiao", "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, - "NumPartitions": 30, - "ServerTagName": "JiSheCollectBus100", - "FirstCollectionTime": "2025-04-22 16:07:00" + "NumPartitions": 30 }, "IoTDBOptions": { "UserName": "root", @@ -141,5 +139,10 @@ "DefaultIdempotence": true } }, - "PlugInFolder": "" + "PlugInFolder": "", + "ServerApplicationOptions": { + "ServerTagName": "JiSheCollectBus100", + "FirstCollectionTime": "2025-04-22 16:07:00", + "AutomaticVerificationTime": "16:07:00" + } } \ No newline at end of file