diff --git a/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs index 22c79e6..f9bc706 100644 --- a/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs @@ -7,7 +7,8 @@ namespace JiShe.CollectBus.Subscribers { public interface ISubscriberAppService : IApplicationService { - Task IssuedEvent(IssuedEventMessage issuedEventMessage); + Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage); + Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage); Task ReceivedEvent(MessageReceived receivedMessage); Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage); Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage); diff --git a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs index 07db264..0542ede 100644 --- a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs +++ b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs @@ -71,7 +71,7 @@ namespace JiShe.CollectBus.EnergySystem return result; - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -108,7 +108,7 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -149,7 +149,7 @@ namespace JiShe.CollectBus.EnergySystem }).ToList(); var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(address, meterParameters); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -178,7 +178,7 @@ namespace JiShe.CollectBus.EnergySystem { var dataUnit = Build645SendData.BuildReadMeterAddressSendDataUnit(detail.MeterAddress); var bytes =Build3761SendData.BuildTransparentForwardingSendCmd(address, detail.Port, detail.BaudRate.ToString(), dataUnit, StopBit.Stop1, Parity.None); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -261,7 +261,7 @@ namespace JiShe.CollectBus.EnergySystem if (bytes != null) { - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -320,7 +320,7 @@ namespace JiShe.CollectBus.EnergySystem var bytes = Build3761SendData.BuildCommunicationParametersSetSendCmd(address, masterIP, materPort, backupIP, backupPort, input.Data.APN); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -347,7 +347,7 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{input.AreaCode}{input.Address}"; var bytes = Build3761SendData.BuildTerminalCalendarClockSendCmd(address); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -375,7 +375,7 @@ namespace JiShe.CollectBus.EnergySystem bool isManual = !input.AreaCode.Equals("5110");//低功耗集中器不是长连接,在连接的那一刻再发送 var bytes = Build3761SendData.BuildConrCheckTimeSendCmd(address,DateTime.Now, isManual); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -402,7 +402,7 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{input.AreaCode}{input.Address}"; var bytes = Build3761SendData.BuildConrRebootSendCmd(address); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -430,7 +430,7 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{input.AreaCode}{input.Address}"; var pnList = input.Data.Split(',').Select(it => int.Parse(it)).ToList(); var bytes = Build3761SendData.BuildAmmeterParameterReadingSendCmd(address, pnList); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -479,7 +479,7 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -548,7 +548,7 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -577,7 +577,7 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{code.AreaCode}{code.Address}"; var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(address,input.Detail.Pn, input.Detail.Unit,input.Detail.Cycle,input.Detail.BaseTime, input.Detail.CurveRatio,input.Detail.Details.Select(it => new PnFn(it.Pn,it.Fn)).ToList()); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -605,7 +605,7 @@ namespace JiShe.CollectBus.EnergySystem { var address = $"{code.AreaCode}{code.Address}"; var bytes = Build3761SendData.BuildAmmeterAutoUpSwitchSetSendCmd(address, input.Detail.Pn,input.Detail.IsOpen); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -631,7 +631,7 @@ namespace JiShe.CollectBus.EnergySystem var result = new BaseResultDto(); var address = $"{input.AreaCode}{input.Address}"; var bytes = Build3761SendData.BuildAmmeterReadAutoUpSwitchSendCmd(address, input.Detail.Pn); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -658,7 +658,7 @@ namespace JiShe.CollectBus.EnergySystem { var address = $"{data.AreaCode}{data.Address}"; var bytes = Build3761SendData.BuildTerminalVersionInfoReadingSendCmd(address); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, @@ -713,7 +713,7 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage + await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 8f37fb4..7224cdb 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -19,6 +19,7 @@ using Volo.Abp.Caching; using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Repositories; +using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.Plugins { @@ -78,7 +79,7 @@ namespace JiShe.CollectBus.Plugins } else { - await OnTcpNormalReceived(tcpSessionClient, messageHexString, aTuple.Item1); + await OnTcpNormalReceived(tcpSessionClient, messageHexString, aTuple.Item1,aFn.ToString()!.PadLeft(2,'0')); } } else @@ -160,7 +161,7 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; - await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent); + await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); //await _producerBus.Publish( messageReceivedLoginEvent); } @@ -199,11 +200,19 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; - await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent); + await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); //await _producerBus.Publish(messageReceivedHeartbeatEvent); } - private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo) + /// + /// 正常帧处理,将不同的AFN进行分发 + /// + /// + /// + /// + /// + /// + private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo,string aFn) { //await _producerBus.Publish(new MessageReceived //{ @@ -215,7 +224,10 @@ namespace JiShe.CollectBus.Plugins // MessageId = NewId.NextGuid().ToString() //}); - await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + + string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); + + await _producerBus.PublishAsync(topicName, new MessageReceived { ClientId = client.Id, ClientIp = client.IP, diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index aa86101..a801454 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -60,9 +60,38 @@ namespace JiShe.CollectBus.Subscribers _meterReadingRecordsRepository = meterReadingRecordsRepository; } - [CapSubscribe(ProtocolConst.SubscriberIssuedEventName)] - public async Task IssuedEvent(IssuedEventMessage issuedEventMessage) + [CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) { + switch (issuedEventMessage.Type) + { + case IssuedEventType.Heartbeat: + break; + case IssuedEventType.Login: + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); + var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); + loginEntity.AckTime = Clock.Now; + loginEntity.IsAck = true; + await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); + break; + case IssuedEventType.Data: + break; + default: + throw new ArgumentOutOfRangeException(); + } + + //var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); + //if (device != null) + //{ + // await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); + //} + + await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + } + + [CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) + { switch (issuedEventMessage.Type) { case IssuedEventType.Heartbeat: @@ -72,13 +101,6 @@ namespace JiShe.CollectBus.Subscribers heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); break; - case IssuedEventType.Login: - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); - var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); - loginEntity.AckTime = Clock.Now; - loginEntity.IsAck = true; - await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - break; case IssuedEventType.Data: break; default: @@ -130,15 +152,14 @@ namespace JiShe.CollectBus.Subscribers Pn = 0, FocusAddress = "", MeterAddress = "", + //DataResult = tb3761FN.Text, }); - //todo 将解析结果插入IoTDB,时标从 - //await _messageReceivedEventRepository.InsertAsync(receivedMessage); } } - [CapSubscribe(ProtocolConst.SubscriberReceivedHeartbeatEventName)] + [CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); @@ -153,7 +174,7 @@ namespace JiShe.CollectBus.Subscribers } } - [CapSubscribe(ProtocolConst.SubscriberReceivedLoginEventName)] + [CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); diff --git a/src/JiShe.CollectBus.Common/Extensions/EnumExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/EnumExtensions.cs index 222984e..f54a3a4 100644 --- a/src/JiShe.CollectBus.Common/Extensions/EnumExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/EnumExtensions.cs @@ -49,6 +49,21 @@ namespace JiShe.CollectBus.Common.Extensions ); } + /// + /// 将枚举转换为字典 + /// + /// + /// + public static Dictionary ToNameValueDictionary() where TEnum : Enum + { + return Enum.GetValues(typeof(TEnum)) + .Cast() + .ToDictionary( + e => e.ToString(), + e => Convert.ToInt32(e) + ); + } + /// /// 将枚举转换为字典 /// diff --git a/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index fa38d1d..8c980c5 100644 --- a/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -760,5 +760,39 @@ namespace JiShe.CollectBus.Common.Helpers return fontValue; } + + /// + /// 获取设备Id哈希值 + /// + /// + /// + /// + public static int GetDeviceHashCode(string deviceId, int TotalShards = 100) + { + // 计算哈希分组ID + return Math.Abs(deviceId.GetHashCode() % TotalShards); + } + + /// + /// 获取设备Id哈希分组 + /// + /// + /// + public static Dictionary> GetDeviceHashGroup(List deviceList) + { + Dictionary> keyValuePairs = new Dictionary>(); + foreach (var deviceId in deviceList) + { + var hashCode = GetDeviceHashCode(deviceId); + + if (!keyValuePairs.ContainsKey(hashCode.ToString())) + { + keyValuePairs.Add(hashCode.ToString(), new List()); + } + + keyValuePairs[hashCode.ToString()].Add(deviceId); + } + return keyValuePairs; + } } } diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 80145ab..32c39b5 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -257,6 +257,9 @@ namespace JiShe.CollectBus.Host /// The configuration. public void ConfigureCap(ServiceConfigurationContext context, IConfiguration configuration) { + List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); + context.Services.AddCap(x => { x.DefaultGroupName = ProtocolConst.SubscriberGroup; @@ -282,9 +285,7 @@ namespace JiShe.CollectBus.Host /// The configuration. /// /// Configures the mass transit. - /// - /// The context. - /// The configuration. + /// public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration) { @@ -296,7 +297,9 @@ namespace JiShe.CollectBus.Host try { - var topics = ProtocolConstExtensions.GetAllTopicNames(); + List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); + List topicSpecifications = new List(); foreach (var item in topics) { @@ -348,20 +351,20 @@ namespace JiShe.CollectBus.Host }); rider.AddConsumer(); - rider.AddProducer(ProtocolConst.SubscriberReceivedLoginEventName); - rider.AddProducer(ProtocolConst.SubscriberReceivedHeartbeatEventName); + rider.AddProducer(ProtocolConst.SubscriberLoginReceivedEventName); + rider.AddProducer(ProtocolConst.SubscriberHeartbeatReceivedEventName); rider.UsingKafka((c, cfg) => { cfg.Host(configuration.GetConnectionString("Kafka")); - cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedHeartbeatEventName, consumerConfig, configurator => + cfg.TopicEndpoint(ProtocolConst.SubscriberHeartbeatReceivedEventName, consumerConfig, configurator => { configurator.AutoOffsetReset = AutoOffsetReset.Earliest; configurator.ConfigureConsumer(c); }); - cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedLoginEventName, consumerConfig, configurator => + cfg.TopicEndpoint(ProtocolConst.SubscriberLoginReceivedEventName, consumerConfig, configurator => { configurator.ConfigureConsumer(c); configurator.AutoOffsetReset = AutoOffsetReset.Earliest; @@ -373,7 +376,7 @@ namespace JiShe.CollectBus.Host configurator.AutoOffsetReset = AutoOffsetReset.Earliest; }); - cfg.TopicEndpoint(ProtocolConst.SubscriberIssuedEventName, consumerConfig, configurator => + cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator => { configurator.ConfigureConsumer(c); configurator.AutoOffsetReset = AutoOffsetReset.Earliest; diff --git a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs index 0a5ab7a..7e34031 100644 --- a/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs +++ b/src/JiShe.CollectBus.MongoDB/Repository/MeterReadingRecord/MeterReadingRecordRepository.cs @@ -120,7 +120,7 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord public async Task FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime) { var collection = await GetShardedCollection(dateTime); - await collection.findon + //await collection.findon throw new NotImplementedException(); } diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 28cf6c2..762a2ca 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -87,7 +87,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); - await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); + await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); } @@ -126,7 +126,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts Fn = 1 }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); - await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); + await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); } diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs index 9455144..8c097e2 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs @@ -1,4 +1,6 @@ -using System; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; +using System; using System.Collections.Generic; using System.Linq; using System.Reflection; @@ -10,10 +12,10 @@ namespace JiShe.CollectBus.Protocol.Contracts public class ProtocolConstExtensions { /// - /// 自动获取 ProtocolConst 类中所有 Kafka 主题名称 + /// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称 /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量) /// - public static List GetAllTopicNames() + public static List GetAllTopicNamesByIssued() { return typeof(ProtocolConst) .GetFields(BindingFlags.Public | BindingFlags.Static) @@ -21,9 +23,36 @@ namespace JiShe.CollectBus.Protocol.Contracts f.IsLiteral && !f.IsInitOnly && f.FieldType == typeof(string) && - f.Name.EndsWith("EventName")) // 通过命名规则过滤主题字段 + f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段 .Select(f => (string)f.GetRawConstantValue()!) .ToList(); } + + /// + /// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称 + /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量) + /// + public static List GetAllTopicNamesByReceived() + { + //固定的上报主题 + var topicList = typeof(ProtocolConst) + .GetFields(BindingFlags.Public | BindingFlags.Static) + .Where(f => + f.IsLiteral && + !f.IsInitOnly && + f.FieldType == typeof(string) && + f.Name.EndsWith("ReceivedEventName")) // 通过命名规则过滤主题字段 + .Select(f => (string)f.GetRawConstantValue()!) + .ToList(); + + //动态上报主题,需根据协议的AFN功能码动态获取 + var afnList = EnumExtensions.ToNameValueDictionary(); + foreach (var item in afnList) + { + topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0'))); + } + + return topicList; + } } } diff --git a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs index 66e6ede..796023a 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs @@ -9,10 +9,28 @@ namespace JiShe.CollectBus.Protocol.Contracts public class ProtocolConst { public const string SubscriberGroup = "jishe.collectbus"; - public const string SubscriberIssuedEventName = "issued.event"; + /// + /// 心跳下行消息主题 + /// + public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event"; + /// + /// 登录下行消息主题 + /// + public const string SubscriberLoginIssuedEventName = "issued.login.event"; + + /// + /// 上行消息主题 + /// public const string SubscriberReceivedEventName = "received.event"; - public const string SubscriberReceivedHeartbeatEventName = "received.heartbeat.event"; - public const string SubscriberReceivedLoginEventName = "received.login.event"; + + /// + /// 心跳上行消息主题 + /// + public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event"; + /// + /// 登录上行消息主题 + /// + public const string SubscriberLoginReceivedEventName = "received.login.event"; #region 电表消息主题 /// @@ -47,7 +65,7 @@ namespace JiShe.CollectBus.Protocol.Contracts /// 电表手动抄读 /// public const string AmmeterSubscriberWorkerManualValveReadingIssuedEventName = "issued.manual.reading.ammeter.event"; - + #endregion #region 水表消息主题 @@ -85,7 +103,10 @@ namespace JiShe.CollectBus.Protocol.Contracts public const string WatermeterSubscriberWorkerManualValveReadingIssuedEventName = "issued.manual.reading.watermeter.event"; #endregion - + /// + /// AFN上行主题格式 + /// + public const string AFNTopicNameFormat = "received.afn{0}.event"; } }