From 11d3fcf162ce187880a8470ad3646d37f50cc548 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Tue, 15 Apr 2025 18:58:38 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4CAP=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...he.CollectBus.Application.Contracts.csproj | 1 + .../Subscribers/ISubscriberAppService.cs | 11 +++--- .../IWorkerSubscriberAppService.cs | 9 +++-- .../JiShe.CollectBus.Application.csproj | 1 - .../Plugins/TcpMonitor.cs | 18 +++++---- .../Samples/SampleAppService.cs | 11 +----- .../BasicScheduledMeterReadingService.cs | 13 +++---- ...nergySystemScheduledMeterReadingService.cs | 2 +- .../Subscribers/SubscriberAppService.cs | 39 ++++++++++++------- .../Subscribers/WorkerSubscriberAppService.cs | 24 +++++++----- .../CollectBusHostModule.cs | 2 +- .../Pages/Monitor.cshtml | 1 - .../Consumer/ConsumerService.cs | 6 +-- .../Producer/IProducerService.cs | 2 +- .../Producer/ProducerService.cs | 10 ++--- .../Abstracts/BaseProtocolPlugin.cs | 10 +++-- ...JiShe.CollectBus.Protocol.Contracts.csproj | 3 +- 17 files changed, 89 insertions(+), 74 deletions(-) diff --git a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj index de972d1..6e4fed5 100644 --- a/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj +++ b/src/JiShe.CollectBus.Application.Contracts/JiShe.CollectBus.Application.Contracts.csproj @@ -26,6 +26,7 @@ + diff --git a/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs index f9bc706..658ff29 100644 --- a/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/Subscribers/ISubscriberAppService.cs @@ -1,16 +1,17 @@ using System.Threading.Tasks; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.Kafka; using Volo.Abp.Application.Services; namespace JiShe.CollectBus.Subscribers { public interface ISubscriberAppService : IApplicationService { - Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage); - Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage); - Task ReceivedEvent(MessageReceived receivedMessage); - Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage); - Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage); + Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage); + Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage); + Task ReceivedEvent(MessageReceived receivedMessage); + Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage); + Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage); } } diff --git a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index abba774..9a37167 100644 --- a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -1,6 +1,7 @@ using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; +using JiShe.CollectBus.Kafka; using System.Collections.Generic; using System.Threading.Tasks; using Volo.Abp.Application.Services; @@ -19,19 +20,19 @@ namespace JiShe.CollectBus.Subscribers /// 1分钟采集电表数据下行消息消费订阅 /// /// - Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); /// /// 5分钟采集电表数据下行消息消费订阅 /// /// - Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); /// /// 15分钟采集电表数据下行消息消费订阅 /// /// - Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); #endregion #region 水表消息采集 @@ -39,7 +40,7 @@ namespace JiShe.CollectBus.Subscribers /// 1分钟采集水表数据下行消息消费订阅 /// /// - Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); #endregion } diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index a2cc613..24db5a5 100644 --- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -23,7 +23,6 @@ - diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index ff4de0c..29427fc 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -7,10 +7,12 @@ using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Enums; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; using MassTransit; using Microsoft.Extensions.Logging; @@ -26,7 +28,7 @@ namespace JiShe.CollectBus.Plugins { public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin { - private readonly ICapPublisher _producerBus; + private readonly IProducerService _producerService; private readonly ILogger _logger; private readonly IRepository _deviceRepository; private readonly IDistributedCache _ammeterInfoCache; @@ -34,16 +36,16 @@ namespace JiShe.CollectBus.Plugins /// /// /// - /// + /// /// /// /// - public TcpMonitor(ICapPublisher producerBus, + public TcpMonitor(IProducerService producerService, ILogger logger, IRepository deviceRepository, IDistributedCache ammeterInfoCache) { - _producerBus = producerBus; + _producerService = producerService; _logger = logger; _deviceRepository = deviceRepository; _ammeterInfoCache = ammeterInfoCache; @@ -170,7 +172,7 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; - await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); + await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent.Serialize()); //await _producerBus.Publish( messageReceivedLoginEvent); } @@ -217,7 +219,7 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; - await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent.Serialize()); //await _producerBus.Publish(messageReceivedHeartbeatEvent); } @@ -245,7 +247,7 @@ namespace JiShe.CollectBus.Plugins //string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); //todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算? - await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived { ClientId = client.Id, ClientIp = client.IP, @@ -253,7 +255,7 @@ namespace JiShe.CollectBus.Plugins MessageHexString = messageHexString, DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); } } } diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 8f5449a..78b472d 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -215,19 +215,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS return aa == null; } - [KafkaSubscribe(["test-topic1"])] + [KafkaSubscribe(["test-topic"])] - public async Task KafkaSubscribeAsync() // TestSubscribe obj + public async Task KafkaSubscribeAsync(object obj) { - var obj=string.Empty; _logger.LogWarning($"收到订阅消息: {obj}"); return SubscribeAck.Success(); } } -public class TestSubscribe -{ - public string Topic { get; set; } - public int Val { get; set; } -} - diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 287158f..f2a08b1 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -29,7 +29,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService { private readonly ILogger _logger; - private readonly ICapPublisher _producerBus; private readonly IIoTDBProvider _dbProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IProducerService _producerService; @@ -37,12 +36,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading public BasicScheduledMeterReadingService( ILogger logger, - ICapPublisher producerBus, IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IIoTDBProvider dbProvider) { - _producerBus = producerBus; _logger = logger; _dbProvider = dbProvider; _meterReadingRecordRepository = meterReadingRecordRepository; @@ -381,7 +378,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg.Serialize()); //_= _producerBus.Publish(tempMsg); @@ -445,7 +442,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg.Serialize()); //_ = _producerBus.Publish(tempMsg); @@ -510,7 +507,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading TimeDensity = timeDensity.ToString(), }; - _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize()); //_ = _producerBus.Publish(tempMsg); @@ -805,7 +802,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); - await _producerService.ProduceAsync(topicName, partition, taskRecord); + await _producerService.ProduceAsync(topicName, partition, taskRecord.Serialize()); } private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) @@ -846,7 +843,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading TimeDensity = timeDensity.ToString(), }; - _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg.Serialize()); //_ = _producerBus.Publish(tempMsg); diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 91f7d4a..8593dfd 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -36,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { string serverTagName = string.Empty; public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider) + IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, meterReadingRecordRepository, producerService,dbProvider) { serverTagName = configuration.GetValue(CommonConst.ServerTagName)!; } diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index d5e9caf..9bf297f 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -6,6 +6,8 @@ using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; +using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; @@ -20,7 +22,7 @@ using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Subscribers { - public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe + public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; @@ -63,9 +65,10 @@ namespace JiShe.CollectBus.Subscribers _dbProvider = dbProvider; } - [CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] - public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) + [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) { + bool isAck = false; switch (issuedEventMessage.Type) { case IssuedEventType.Heartbeat: @@ -76,6 +79,7 @@ namespace JiShe.CollectBus.Subscribers loginEntity.AckTime = Clock.Now; loginEntity.IsAck = true; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); + isAck = true; break; case IssuedEventType.Data: break; @@ -90,11 +94,13 @@ namespace JiShe.CollectBus.Subscribers //} await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + return isAck? SubscribeAck.Success(): SubscribeAck.Fail(); } - [CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] - public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) + [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) { + bool isAck = false; switch (issuedEventMessage.Type) { case IssuedEventType.Heartbeat: @@ -103,6 +109,7 @@ namespace JiShe.CollectBus.Subscribers heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); + isAck = true; break; case IssuedEventType.Data: break; @@ -117,10 +124,11 @@ namespace JiShe.CollectBus.Subscribers //} await _tcpService.SendAsync(issuedEventMessage.ClientId, issuedEventMessage.Message); + return isAck ? SubscribeAck.Success() : SubscribeAck.Fail(); } - [CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] - public async Task ReceivedEvent(MessageReceived receivedMessage) + [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] + public async Task ReceivedEvent(MessageReceived receivedMessage) { var currentTime = Clock.Now; @@ -137,13 +145,13 @@ namespace JiShe.CollectBus.Subscribers if(fN == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return; + return SubscribeAck.Success(); } var tb3761FN = fN.FnList.FirstOrDefault(); if (tb3761FN == null) { Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); - return; + return SubscribeAck.Success(); } //报文入库 @@ -169,11 +177,14 @@ namespace JiShe.CollectBus.Subscribers //todo 查找是否有下发任务 //await _messageReceivedEventRepository.InsertAsync(receivedMessage); + + } + return SubscribeAck.Success(); } - [CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] - public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) + [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) @@ -185,10 +196,11 @@ namespace JiShe.CollectBus.Subscribers await protocolPlugin.HeartbeatAsync(receivedHeartbeatMessage); await _messageReceivedHeartbeatEventRepository.InsertAsync(receivedHeartbeatMessage); } + return SubscribeAck.Success(); } - [CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] - public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) + [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) { var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) @@ -200,6 +212,7 @@ namespace JiShe.CollectBus.Subscribers await protocolPlugin.LoginAsync(receivedLoginMessage); await _messageReceivedLoginEventRepository.InsertAsync(receivedLoginMessage); } + return SubscribeAck.Success(); } } } diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 3caba20..f3c485c 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -8,6 +8,8 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; +using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Repository.MeterReadingRecord; @@ -24,7 +26,7 @@ namespace JiShe.CollectBus.Subscribers /// 定时抄读任务消息消费订阅 /// [Route($"/worker/app/subscriber")] - public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService,ICapSubscribe + public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; @@ -63,8 +65,8 @@ namespace JiShe.CollectBus.Subscribers /// [HttpPost] [Route("ammeter/oneminute/issued-event")] - [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] - public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] + public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); @@ -81,6 +83,7 @@ namespace JiShe.CollectBus.Subscribers } } + return SubscribeAck.Success(); } /// @@ -90,8 +93,8 @@ namespace JiShe.CollectBus.Subscribers /// [HttpPost] [Route("ammeter/fiveminute/issued-event")] - [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] - public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] + public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); @@ -108,6 +111,7 @@ namespace JiShe.CollectBus.Subscribers } } + return SubscribeAck.Success(); } /// @@ -117,8 +121,8 @@ namespace JiShe.CollectBus.Subscribers /// [HttpPost] [Route("ammeter/fifteenminute/issued-event")] - [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] - public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] + public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); try @@ -137,6 +141,7 @@ namespace JiShe.CollectBus.Subscribers } } + return SubscribeAck.Success(); } catch (Exception ex) { @@ -155,8 +160,8 @@ namespace JiShe.CollectBus.Subscribers /// [HttpPost] [Route("watermeter/fifteenminute/issued-event")] - [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] - public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] + public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); @@ -172,6 +177,7 @@ namespace JiShe.CollectBus.Subscribers await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); } } + return SubscribeAck.Success(); } #endregion } diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index aabc2ba..377453e 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -43,7 +43,7 @@ namespace JiShe.CollectBus.Host ConfigureNetwork(context, configuration); ConfigureJwtAuthentication(context, configuration); ConfigureHangfire(context); - ConfigureCap(context, configuration); + //ConfigureCap(context, configuration); //ConfigureMassTransit(context, configuration); //ConfigureKafkaTopic(context, configuration); ConfigureAuditLog(context); diff --git a/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml index b438e18..aaadf3f 100644 --- a/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,7 +16,6 @@ 后端服务 - diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index 08b8cb1..f59385f 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -146,14 +146,14 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { - var consumerKey = typeof((Null, TValue)); + var consumerKey = typeof((Ignore, TValue)); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _=> ( - CreateConsumer(groupId), + CreateConsumer(groupId), cts - )).Consumer as IConsumer; + )).Consumer as IConsumer; consumer!.Subscribe(topics); diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs index becea90..b00f5cf 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs @@ -15,6 +15,6 @@ namespace JiShe.CollectBus.Kafka.Producer Task ProduceAsync(string topic, TKey key, TValue value, int? partition, Action>? deliveryHandler = null) where TKey : notnull where TValue : class; - Task ProduceAsync(string topic, TValue value, int? partition = null, Action>? deliveryHandler = null) where TValue : class; + Task ProduceAsync(string topic, TValue value, int? partition = null, Action>? deliveryHandler = null) where TValue : class; } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index ca45f8c..c322294 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -114,8 +114,8 @@ namespace JiShe.CollectBus.Kafka.Producer /// public async Task ProduceAsync(string topic, TValue value) where TValue : class { - var producer = GetProducer(); - await producer.ProduceAsync(topic, new Message { Value = value }); + var producer = GetProducer(); + await producer.ProduceAsync(topic, new Message { Value = value }); } /// @@ -160,13 +160,13 @@ namespace JiShe.CollectBus.Kafka.Producer /// /// /// - public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class + public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class { - var message = new Message + var message = new Message { Value = value }; - var producer = GetProducer(); + var producer = GetProducer(); if (partition.HasValue) { var topicPartition = new TopicPartition(topic, partition.Value); diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 762a2ca..23d0917 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -12,12 +12,14 @@ using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using MassTransit; +using JiShe.CollectBus.Kafka.Producer; +using JiShe.CollectBus.Common.Helpers; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { public abstract class BaseProtocolPlugin : IProtocolPlugin { - private readonly ICapPublisher _producerBus; + private readonly IProducerService _producerService; private readonly ILogger _logger; private readonly IRepository _protocolInfoRepository; @@ -37,7 +39,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts _logger = serviceProvider.GetRequiredService>(); _protocolInfoRepository = serviceProvider.GetRequiredService>(); - _producerBus = serviceProvider.GetRequiredService(); + _producerService = serviceProvider.GetRequiredService(); } public abstract ProtocolInfo Info { get; } @@ -87,7 +89,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); - await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); + await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }.Serialize()); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); } @@ -126,7 +128,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts Fn = 1 }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); - await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); + await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }.Serialize()); //await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); } diff --git a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj index aa5bd73..248ee30 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj +++ b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj @@ -7,7 +7,7 @@ - + @@ -17,6 +17,7 @@ +