From 5a294b437c97721f1704315c1c07bdbe29f895da Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Tue, 1 Apr 2025 22:50:34 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Plugins/TcpMonitor.cs | 27 ++++++++++---- .../BasicScheduledMeterReadingService.cs | 35 +++++++++++++------ ...nergySystemScheduledMeterReadingService.cs | 2 +- .../Subscribers/SubscriberAppService.cs | 7 ++-- .../CollectBusHostModule.Configure.cs | 24 ++++++++----- .../CollectBusHostModule.cs | 4 +-- .../Abstracts/BaseProtocolPlugin.cs | 12 ++++--- 7 files changed, 73 insertions(+), 38 deletions(-) diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index b3b43d3..4801404 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Plugins { public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin { - private readonly ICapPublisher _capBus; + private readonly IPublishEndpoint _producerBus; private readonly ILogger _logger; private readonly IRepository _deviceRepository; private readonly IDistributedCache _ammeterInfoCache; @@ -32,16 +32,16 @@ namespace JiShe.CollectBus.Plugins /// /// /// - /// + /// /// /// /// - public TcpMonitor(ICapPublisher capBus, + public TcpMonitor(IPublishEndpoint producerBus, ILogger logger, IRepository deviceRepository, IDistributedCache ammeterInfoCache) { - _capBus = capBus; + _producerBus = producerBus; _logger = logger; _deviceRepository = deviceRepository; _ammeterInfoCache = ammeterInfoCache; @@ -161,7 +161,9 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; - await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent); + //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent); + + await _producerBus.Publish( messageReceivedLoginEvent); } private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo) @@ -198,12 +200,13 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; - await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent); + //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent); + await _producerBus.Publish(messageReceivedHeartbeatEvent); } private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo) { - await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + await _producerBus.Publish(new MessageReceived { ClientId = client.Id, ClientIp = client.IP, @@ -212,6 +215,16 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }); + + //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + //{ + // ClientId = client.Id, + // ClientIp = client.IP, + // ClientPort = client.Port, + // MessageHexString = messageHexString, + // DeviceNo = deviceNo, + // MessageId = NewId.NextGuid().ToString() + //}); } } } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index b1ea2d6..f65e51f 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -39,15 +39,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService { private readonly ILogger _logger; - private readonly ICapPublisher _capBus; + private readonly IPublishEndpoint _producerBus; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; public BasicScheduledMeterReadingService( ILogger logger, - ICapPublisher capBus, + IPublishEndpoint producerBus, IMeterReadingRecordRepository meterReadingRecordsRepository) { - _capBus = capBus; + _producerBus = producerBus; _logger = logger; _meterReadingRecordsRepository = meterReadingRecordsRepository; } @@ -298,8 +298,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - _ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - + //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + _= _producerBus.Publish(tempMsg); + + meterTaskInfosList.Add(ammerterItem.Value); } } @@ -363,7 +365,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - _= _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); + //_= _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); + + _ = _producerBus.Publish(tempMsg); meterTaskInfosList.Add(ammerterItem.Value); } @@ -433,9 +437,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading TimeDensity = timeDensity.ToString(), }; - _ = _capBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); - //await _massTransitBus.Publish(tempMsg); + _ = _producerBus.Publish(tempMsg); meterTaskInfosList.Add(ammerterItem.Value); } @@ -823,7 +827,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + //await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + + _ = _producerBus.Publish(tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } @@ -889,7 +896,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + //await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + + _ = _producerBus.Publish(tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } @@ -954,7 +964,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + //await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + + _ = _producerBus.Publish(tempMsg); + meterTaskInfosList.Add(ammerterItem.Value); } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 70cecfa..78276e9 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -29,7 +29,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher capBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, capBus, meterReadingRecordsRepository) + IPublishEndpoint producerBus, IMeterReadingRecordRepository meterReadingRecordsRepository) : base(logger, producerBus, meterReadingRecordsRepository) { } diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index b2f7400..242b216 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -62,19 +62,18 @@ namespace JiShe.CollectBus.Subscribers [CapSubscribe(ProtocolConst.SubscriberIssuedEventName)] public async Task IssuedEvent(IssuedEventMessage issuedEventMessage) - { - _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 回复下发内容IssuedEvent:{issuedEventMessage.MessageId}"); - + { switch (issuedEventMessage.Type) { case IssuedEventType.Heartbeat: - _logger.LogInformation($"IssuedEvent:{issuedEventMessage.MessageId}"); + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 心跳回复下发内容:{issuedEventMessage.Serialize()}"); var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); heartbeatEntity.AckTime = Clock.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); break; case IssuedEventType.Login: + _logger.LogWarning($"集中器地址{issuedEventMessage.ClientId} 登录回复下发内容:{issuedEventMessage.Serialize()}"); var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == issuedEventMessage.MessageId); loginEntity.AckTime = Clock.Now; loginEntity.IsAck = true; diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 374eaa5..f0a02c9 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -21,6 +21,7 @@ using JiShe.CollectBus.Consumers; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageIssueds; +using Confluent.Kafka; namespace JiShe.CollectBus.Host @@ -279,9 +280,13 @@ namespace JiShe.CollectBus.Host /// The configuration. public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration) { + + //context.Services.AddSingleton(); + context.Services.AddMassTransit(x => { x.UsingInMemory(); + x.AddConfigureEndpointsCallback((c, name, cfg) => { @@ -308,37 +313,38 @@ namespace JiShe.CollectBus.Host rider.UsingKafka((c, cfg) => { + cfg.Host(configuration.GetConnectionString("Kafka")); cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedHeartbeatEventName, ProtocolConst.SubscriberGroup, configurator => { configurator.ConfigureConsumer(c); - configurator.ConfigureConsumeTopology = false; + configurator.AutoOffsetReset = AutoOffsetReset.Earliest; }); cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedLoginEventName, ProtocolConst.SubscriberGroup, configurator => { configurator.ConfigureConsumer(c); - configurator.ConfigureConsumeTopology = false; + configurator.AutoOffsetReset = AutoOffsetReset.Earliest; }); cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedEventName, ProtocolConst.SubscriberGroup, configurator => { configurator.ConfigureConsumer(c); - configurator.ConfigureConsumeTopology = false; + configurator.AutoOffsetReset = AutoOffsetReset.Earliest; }); cfg.TopicEndpoint(ProtocolConst.SubscriberIssuedEventName, ProtocolConst.SubscriberGroup, configurator => { configurator.ConfigureConsumer(c); - configurator.ConfigureConsumeTopology = false; + configurator.AutoOffsetReset = AutoOffsetReset.Earliest; }); - //cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator => - //{ - // configurator.ConfigureConsumer(c); - // configurator.ConfigureConsumeTopology = false; - //}); + cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator => + { + configurator.ConfigureConsumer(c); + configurator.AutoOffsetReset = AutoOffsetReset.Earliest; + }); }); }); }); diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index 6f24b1e..263be23 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -43,8 +43,8 @@ namespace JiShe.CollectBus.Host ConfigureNetwork(context, configuration); ConfigureJwtAuthentication(context, configuration); ConfigureHangfire(context); - ConfigureCap(context, configuration); - //ConfigureMassTransit(context, configuration); + //ConfigureCap(context, configuration); + ConfigureMassTransit(context, configuration); ConfigureAuditLog(context); ConfigureCustom(context, configuration); } diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 978a056..2fbe138 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -11,12 +11,13 @@ using JiShe.CollectBus.Protocol.Contracts.AnalysisData; using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; +using MassTransit; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { public abstract class BaseProtocolPlugin : IProtocolPlugin { - private readonly ICapPublisher _capBus; + private readonly IPublishEndpoint _producerBus; private readonly ILogger _logger; private readonly IRepository _protocolInfoRepository; @@ -36,7 +37,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts _logger = serviceProvider.GetRequiredService>(); _protocolInfoRepository = serviceProvider.GetRequiredService>(); - _capBus = serviceProvider.GetRequiredService(); + _producerBus = serviceProvider.GetRequiredService(); } public abstract ProtocolInfo Info { get; } @@ -86,7 +87,8 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); + //await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); + await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Login, MessageId = messageReceived.MessageId }); } /// @@ -124,7 +126,9 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts Fn = 1 }; var bytes = Build3761SendData.BuildSendCommandBytes(reqParam); - await _capBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); + //await _producerBus.PublishAsync(ProtocolConst.SubscriberIssuedEventName, new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); + + await _producerBus.Publish(new IssuedEventMessage { ClientId = messageReceived.ClientId, DeviceNo = messageReceived.DeviceNo, Message = bytes, Type = IssuedEventType.Heartbeat, MessageId = messageReceived.MessageId }); } }