From 0d4c7807273bb64d6ecc0ee8e4a5372c222c0fa8 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Wed, 16 Apr 2025 09:54:21 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=9D=E7=95=99CAP=E7=BB=84=E4=BB=B6?= =?UTF-8?q?=EF=BC=8C=E5=90=8E=E7=BB=AD=E5=86=B3=E5=AE=9A=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E7=A7=BB=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- JiShe.CollectBus.sln | 7 + .../EnergySystem/EnergySystemAppService.cs | 152 +++++++++++++++++- .../JiShe.CollectBus.Application.csproj | 1 + .../Plugins/TcpMonitor.cs | 23 ++- .../Samples/SampleAppService.cs | 2 +- .../BasicScheduledMeterReadingService.cs | 14 +- ...nergySystemScheduledMeterReadingService.cs | 3 +- .../Subscribers/SubscriberAppService.cs | 10 +- .../Subscribers/WorkerSubscriberAppService.cs | 7 +- .../CollectBusHostModule.cs | 2 +- .../AdminClient/AdminClientService.cs | 13 ++ .../AdminClient/IAdminClientService.cs | 7 + .../Attributes/KafkaSubscribeAttribute.cs | 22 +-- .../CollectBusKafkaModule.cs | 4 +- .../Consumer/ConsumerService.cs | 24 ++- .../KafkaSubcribesExtensions.cs | 30 +++- .../Producer/IProducerService.cs | 2 +- .../Producer/ProducerService.cs | 12 +- .../Abstracts/BaseProtocolPlugin.cs | 8 +- ...JiShe.CollectBus.Protocol.Contracts.csproj | 1 + 20 files changed, 299 insertions(+), 45 deletions(-) diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index 9c67aae..ea5e278 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -39,6 +39,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -113,6 +115,10 @@ Global {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU + {FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -135,6 +141,7 @@ Global {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} + {FA762E8F-659A-DECF-83D6-5F364144450E} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs index a210d28..2a60803 100644 --- a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs +++ b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs @@ -5,6 +5,7 @@ using System.Net; using System.Text; using System.Threading.Tasks; using DeviceDetectorNET.Class.Device; +using DotNetCore.CAP; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; @@ -30,14 +31,16 @@ namespace JiShe.CollectBus.EnergySystem private readonly IRepository _csqRecordRepository; private readonly IRepository _conrOnlineRecordRepository; private readonly IProducerService _producerService; + private readonly ICapPublisher _capBus; public EnergySystemAppService(IRepository focusRecordRepository, IRepository csqRecordRepository, - IRepository conrOnlineRecordRepository, IProducerService producerService) + IRepository conrOnlineRecordRepository, IProducerService producerService, ICapPublisher capBus) { _focusRecordRepository = focusRecordRepository; _csqRecordRepository = csqRecordRepository; _conrOnlineRecordRepository = conrOnlineRecordRepository; _producerService = producerService; + _capBus = capBus; } /// @@ -71,7 +74,15 @@ namespace JiShe.CollectBus.EnergySystem if (bytes == null) return result; - + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -109,6 +120,15 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -117,6 +137,7 @@ namespace JiShe.CollectBus.EnergySystem Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() }.Serialize()); + } return result; @@ -150,6 +171,14 @@ namespace JiShe.CollectBus.EnergySystem }).ToList(); var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(address, meterParameters); + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -179,6 +208,15 @@ namespace JiShe.CollectBus.EnergySystem { var dataUnit = Build645SendData.BuildReadMeterAddressSendDataUnit(detail.MeterAddress); var bytes =Build3761SendData.BuildTransparentForwardingSendCmd(address, detail.Port, detail.BaudRate.ToString(), dataUnit, StopBit.Stop1, Parity.None); + + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -262,6 +300,15 @@ namespace JiShe.CollectBus.EnergySystem if (bytes != null) { + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -321,6 +368,15 @@ namespace JiShe.CollectBus.EnergySystem var bytes = Build3761SendData.BuildCommunicationParametersSetSendCmd(address, masterIP, materPort, backupIP, backupPort, input.Data.APN); + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -348,6 +404,15 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{input.AreaCode}{input.Address}"; var bytes = Build3761SendData.BuildTerminalCalendarClockSendCmd(address); + + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -376,6 +441,14 @@ namespace JiShe.CollectBus.EnergySystem bool isManual = !input.AreaCode.Equals("5110");//低功耗集中器不是长连接,在连接的那一刻再发送 var bytes = Build3761SendData.BuildConrCheckTimeSendCmd(address,DateTime.Now, isManual); + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -403,6 +476,14 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{input.AreaCode}{input.Address}"; var bytes = Build3761SendData.BuildConrRebootSendCmd(address); + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -431,6 +512,14 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{input.AreaCode}{input.Address}"; var pnList = input.Data.Split(',').Select(it => int.Parse(it)).ToList(); var bytes = Build3761SendData.BuildAmmeterParameterReadingSendCmd(address, pnList); + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -480,6 +569,15 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -488,7 +586,7 @@ namespace JiShe.CollectBus.EnergySystem Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() }.Serialize()); - + } result.Status = true; result.Msg = "操作成功"; @@ -549,6 +647,14 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -578,6 +684,14 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{code.AreaCode}{code.Address}"; var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(address,input.Detail.Pn, input.Detail.Unit,input.Detail.Cycle,input.Detail.BaseTime, input.Detail.CurveRatio,input.Detail.Details.Select(it => new PnFn(it.Pn,it.Fn)).ToList()); + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -606,6 +720,14 @@ namespace JiShe.CollectBus.EnergySystem { var address = $"{code.AreaCode}{code.Address}"; var bytes = Build3761SendData.BuildAmmeterAutoUpSwitchSetSendCmd(address, input.Detail.Pn,input.Detail.IsOpen); + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -632,6 +754,14 @@ namespace JiShe.CollectBus.EnergySystem var result = new BaseResultDto(); var address = $"{input.AreaCode}{input.Address}"; var bytes = Build3761SendData.BuildAmmeterReadAutoUpSwitchSendCmd(address, input.Detail.Pn); + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -659,6 +789,14 @@ namespace JiShe.CollectBus.EnergySystem { var address = $"{data.AreaCode}{data.Address}"; var bytes = Build3761SendData.BuildTerminalVersionInfoReadingSendCmd(address); + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, @@ -714,6 +852,14 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { + //await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + //{ + // //ClientId = messageReceived.ClientId, + // DeviceNo = address, + // Message = bytes, + // Type = IssuedEventType.Data, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index 24db5a5..3fa551c 100644 --- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -15,6 +15,7 @@ + diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 7af99bc..50b41b0 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; +using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; @@ -27,6 +28,7 @@ namespace JiShe.CollectBus.Plugins { public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin { + private readonly ICapPublisher _producerBus; private readonly IProducerService _producerService; private readonly ILogger _logger; private readonly IRepository _deviceRepository; @@ -39,11 +41,12 @@ namespace JiShe.CollectBus.Plugins /// /// /// - public TcpMonitor(IProducerService producerService, + public TcpMonitor(ICapPublisher producerBus, IProducerService producerService, ILogger logger, IRepository deviceRepository, IDistributedCache ammeterInfoCache) { + _producerBus = producerBus; _producerService = producerService; _logger = logger; _deviceRepository = deviceRepository; @@ -171,6 +174,10 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; + + //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); + + await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent.Serialize()); //await _producerBus.Publish( messageReceivedLoginEvent); @@ -218,6 +225,8 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; + //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent.Serialize()); //await _producerBus.Publish(messageReceivedHeartbeatEvent); } @@ -242,10 +251,18 @@ namespace JiShe.CollectBus.Plugins // MessageId = NewId.NextGuid().ToString() //}); - + //string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); //todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算? - + //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + //{ + // ClientId = client.Id, + // ClientIp = client.IP, + // ClientPort = client.Port, + // MessageHexString = messageHexString, + // DeviceNo = deviceNo, + // MessageId = NewId.NextGuid().ToString() + //}); await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived { ClientId = client.Id, diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 78b472d..d4c9a59 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -215,7 +215,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS return aa == null; } - [KafkaSubscribe(["test-topic"])] + [KafkaSubscribe("test-topic1")] public async Task KafkaSubscribeAsync(object obj) { diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 0f601d9..8fa4f5e 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Ammeters; +using DotNetCore.CAP; +using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; @@ -31,14 +32,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IIoTDBProvider _dbProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IProducerService _producerService; - + private readonly ICapPublisher _producerBus; public BasicScheduledMeterReadingService( ILogger logger, + ICapPublisher producerBus, IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IIoTDBProvider dbProvider) { + _producerBus = producerBus; _logger = logger; _dbProvider = dbProvider; _meterReadingRecordRepository = meterReadingRecordRepository; @@ -377,6 +380,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; + //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg.Serialize()); //_= _producerBus.Publish(tempMsg); @@ -441,6 +446,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; + //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg.Serialize()); //_ = _producerBus.Publish(tempMsg); @@ -505,6 +512,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; + //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize()); @@ -841,6 +849,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; + //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize()); @@ -1149,6 +1158,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; + //await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index c1e45bd..3b0da49 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using Confluent.Kafka; +using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; @@ -35,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { string serverTagName = string.Empty; public EnergySystemScheduledMeterReadingService(ILogger logger, - IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, meterReadingRecordRepository, producerService,dbProvider) + ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider) { serverTagName = configuration.GetValue(CommonConst.ServerTagName)!; } diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index fb469ac..3ec6936 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Common.Enums; +using DotNetCore.CAP; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IoTDBProvider; @@ -21,7 +22,7 @@ using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Subscribers { - public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe + public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; @@ -65,6 +66,7 @@ namespace JiShe.CollectBus.Subscribers } [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) { bool isAck = false; @@ -97,6 +99,7 @@ namespace JiShe.CollectBus.Subscribers } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) { bool isAck = false; @@ -127,6 +130,7 @@ namespace JiShe.CollectBus.Subscribers } [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] public async Task ReceivedEvent(MessageReceived receivedMessage) { var currentTime = Clock.Now; @@ -183,6 +187,7 @@ namespace JiShe.CollectBus.Subscribers } [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); @@ -199,6 +204,7 @@ namespace JiShe.CollectBus.Subscribers } [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 95b563c..47cff47 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; +using DotNetCore.CAP; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; @@ -25,7 +26,7 @@ namespace JiShe.CollectBus.Subscribers /// 定时抄读任务消息消费订阅 /// [Route($"/worker/app/subscriber")] - public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe + public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, ICapSubscribe, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; @@ -65,6 +66,7 @@ namespace JiShe.CollectBus.Subscribers [HttpPost] [Route("ammeter/oneminute/issued-event")] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] + //[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); @@ -93,6 +95,7 @@ namespace JiShe.CollectBus.Subscribers [HttpPost] [Route("ammeter/fiveminute/issued-event")] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] + //[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); @@ -121,6 +124,7 @@ namespace JiShe.CollectBus.Subscribers [HttpPost] [Route("ammeter/fifteenminute/issued-event")] [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] + //[CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); @@ -160,6 +164,7 @@ namespace JiShe.CollectBus.Subscribers [HttpPost] [Route("watermeter/fifteenminute/issued-event")] [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] + //[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理"); diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index 377453e..aabc2ba 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -43,7 +43,7 @@ namespace JiShe.CollectBus.Host ConfigureNetwork(context, configuration); ConfigureJwtAuthentication(context, configuration); ConfigureHangfire(context); - //ConfigureCap(context, configuration); + ConfigureCap(context, configuration); //ConfigureMassTransit(context, configuration); //ConfigureKafkaTopic(context, configuration); ConfigureAuditLog(context); diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs index 26d028e..59e34fa 100644 --- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs @@ -183,6 +183,19 @@ namespace JiShe.CollectBus.Kafka.AdminClient return partitions.Any(p => p.PartitionId == targetPartition); } + /// + /// 获取主题的分区数量 + /// + /// + /// + public int GetTopicPartitionsNum(string topic) + { + var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); + if (metadata.Topics.Count == 0) + return 0; + return metadata.Topics[0].Partitions.Count; + } + public void Dispose() { Instance?.Dispose(); diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs index 238a130..92121c5 100644 --- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs @@ -52,5 +52,12 @@ namespace JiShe.CollectBus.Kafka.AdminClient /// /// bool CheckPartitionsExist(string topic, int targetPartition); + + /// + /// 获取主题的分区数量 + /// + /// + /// + int GetTopicPartitionsNum(string topic); } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs index 7a059e0..32f652e 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs @@ -12,7 +12,7 @@ namespace JiShe.CollectBus.Kafka.Attributes /// /// 订阅的主题 /// - public string[] Topics { get; set; } + public string Topic { get; set; } /// /// 分区 @@ -24,28 +24,20 @@ namespace JiShe.CollectBus.Kafka.Attributes /// public string GroupId { get; set; } - public KafkaSubscribeAttribute(string[] topics, string groupId = "default") - { - this.Topics = topics; - this.GroupId = groupId; - } + /// + /// 任务数(默认是多少个分区多少个任务) + /// + public int TaskCount { get; set; } = -1; public KafkaSubscribeAttribute(string topic, string groupId = "default") { - this.Topics = new string[] { topic }; + this.Topic = topic; this.GroupId = groupId; } - public KafkaSubscribeAttribute(string[] topics, int partition, string groupId = "default") - { - this.Topics = topics; - this.Partition = partition; - this.GroupId = groupId; - } - public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default") { - this.Topics = new string[] { topic }; + this.Topic = topic ; this.Partition = partition; this.GroupId = groupId; } diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index 153e5ef..fe0e866 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -17,9 +17,9 @@ namespace JiShe.CollectBus.Kafka public override void ConfigureServices(ServiceConfigurationContext context) { // 注册Producer - context.Services.AddTransient(); + context.Services.AddSingleton(); // 注册Consumer - context.Services.AddTransient(); + context.Services.AddSingleton(); } public override void OnApplicationInitialization(ApplicationInitializationContext context) diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index f59385f..9a2142e 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -9,6 +9,7 @@ using static Confluent.Kafka.ConfigPropertyNames; using System.Collections.Concurrent; using System.Text.RegularExpressions; using NUglify.Html; +using Serilog; namespace JiShe.CollectBus.Kafka.Consumer { @@ -37,6 +38,7 @@ namespace JiShe.CollectBus.Kafka.Consumer { var config = BuildConsumerConfig(groupId); return new ConsumerBuilder(config) + .SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}")) .SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}")) .Build(); } @@ -50,7 +52,9 @@ namespace JiShe.CollectBus.Kafka.Consumer BootstrapServers = _configuration["Kafka:BootstrapServers"], GroupId = groupId ?? "default", AutoOffsetReset = AutoOffsetReset.Earliest, - EnableAutoCommit = false // 禁止AutoCommit + EnableAutoCommit = false, // 禁止AutoCommit + EnablePartitionEof = true, // 启用分区末尾标记 + AllowAutoCreateTopics= true // 启用自动创建 }; if (enableAuth) @@ -119,6 +123,15 @@ namespace JiShe.CollectBus.Kafka.Consumer try { var result = consumer.Consume(cts.Token); + if (result == null) continue; + if (result.Message.Value == null) continue; + if (result.IsPartitionEOF) + { + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(TimeSpan.FromSeconds(1)); + continue; + } + bool sucess= await messageHandler(result.Message.Key, result.Message.Value); if (sucess) { @@ -164,6 +177,15 @@ namespace JiShe.CollectBus.Kafka.Consumer try { var result = consumer.Consume(cts.Token); + if (result == null) continue; + if (result.Message == null) continue; + if (result.IsPartitionEOF) + { + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(TimeSpan.FromSeconds(1)); + continue; + } + bool sucess = await messageHandler(result.Message.Value); if (sucess) consumer.Commit(result); // 手动提交 diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index d3ce904..cfe5eee 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -2,6 +2,7 @@ using DeviceDetectorNET; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; using Microsoft.AspNetCore.Builder; @@ -41,6 +42,9 @@ namespace JiShe.CollectBus.Kafka lifetime.ApplicationStarted.Register(() => { + var logger = provider.GetRequiredService>(); + int threadCount = 0; + int topicCount = 0; foreach (var subscribeType in subscribeTypes) { var subscribes = provider.GetServices(subscribeType).ToList(); @@ -48,10 +52,13 @@ namespace JiShe.CollectBus.Kafka if(subscribe is IKafkaSubscribe) { - BuildKafkaSubscriber(subscribe, provider); + Tuple tuple= BuildKafkaSubscriber(subscribe, provider, logger); + threadCount+= tuple.Item1; + topicCount+= tuple.Item2; } }); } + logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); }); } @@ -60,16 +67,27 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider) + private static Tuple BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger logger) { var subscribedMethods = subscribe.GetType().GetMethods() .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) .Where(x => x.Attribute != null) .ToArray(); + + int threadCount = 0; foreach (var sub in subscribedMethods) { - Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe)); + var adminClientService = provider.GetRequiredService(); + int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; + if (partitionCount <= 0) + partitionCount = 1; + for (int i = 0; i < partitionCount; i++) + { + Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)); + threadCount++; + } } + return Tuple.Create(threadCount, subscribedMethods.Length); } /// @@ -80,11 +98,11 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe) + private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); - var logger = provider.GetRequiredService>(); - await consumerService.SubscribeAsync(attr.Topics, async (message) => + + await consumerService.SubscribeAsync(attr.Topic, async (message) => { try { diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs index b00f5cf..a401775 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs @@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer Task ProduceAsync(string topic, TKey key, TValue value, int? partition, Action>? deliveryHandler = null) where TKey : notnull where TValue : class; - Task ProduceAsync(string topic, TValue value, int? partition = null, Action>? deliveryHandler = null) where TValue : class; + Task ProduceAsync(string topic, TValue value, int? partition = null, Action>? deliveryHandler = null) where TValue : class; } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index c322294..8c069a9 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -9,6 +9,7 @@ using JiShe.CollectBus.Kafka.Consumer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; +using YamlDotNet.Serialization; namespace JiShe.CollectBus.Kafka.Producer { @@ -62,6 +63,7 @@ namespace JiShe.CollectBus.Kafka.Producer LingerMs = 20, // 修改等待时间为20ms Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader MessageSendMaxRetries = 50, // 消息发送失败最大重试50次 + MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs }; if (enableAuth) @@ -114,8 +116,8 @@ namespace JiShe.CollectBus.Kafka.Producer /// public async Task ProduceAsync(string topic, TValue value) where TValue : class { - var producer = GetProducer(); - await producer.ProduceAsync(topic, new Message { Value = value }); + var producer = GetProducer(); + await producer.ProduceAsync(topic, new Message { Value = value }); } /// @@ -160,13 +162,13 @@ namespace JiShe.CollectBus.Kafka.Producer /// /// /// - public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class + public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class { - var message = new Message + var message = new Message { Value = value }; - var producer = GetProducer(); + var producer = GetProducer(); if (partition.HasValue) { var topicPartition = new TopicPartition(topic, partition.Value); diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index d066a28..963d89b 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -13,11 +13,13 @@ using JiShe.CollectBus.IotSystems.Protocols; using MassTransit; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Common.Helpers; +using DotNetCore.CAP; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { public abstract class BaseProtocolPlugin : IProtocolPlugin { + private readonly ICapPublisher _producerBus; private readonly IProducerService _producerService; private readonly ILogger _logger; private readonly IRepository _protocolInfoRepository; @@ -38,7 +40,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts _logger = serviceProvider.GetRequiredService>(); _protocolInfoRepository = serviceProvider.GetRequiredService>(); - _producerService = serviceProvider.GetRequiredService(); + _producerService = serviceProvider.GetRequiredService(); + _producerBus = serviceProvider.GetRequiredService(); } public abstract ProtocolInfo Info { get; } @@ -87,6 +90,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts Fn = 1 }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); + //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }.Serialize()); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); @@ -127,6 +131,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts Fn = 1 }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); + //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }.Serialize()); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); diff --git a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj index 248ee30..2721676 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj +++ b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj @@ -8,6 +8,7 @@ +