From dc7416bdbf18e5c5c3a93d5b6010655a55b12af2 Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Fri, 18 Apr 2025 09:50:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4CAP=20=E5=92=8C=20MassTransit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Abstracts/BaseProtocolPlugin.cs | 4 - ...JiShe.CollectBus.Protocol.Contracts.csproj | 2 - .../JiShe.CollectBus.Protocol.Test.csproj | 1 - .../JiShe.CollectBus.Protocol.csproj | 1 - .../Consumers/IssuedConsumer.cs | 64 ---------- .../Consumers/IssuedFaultConsumer.cs | 15 --- .../Consumers/ReceivedConsumer.cs | 59 --------- .../Consumers/ReceivedFaultConsumer.cs | 15 --- .../Consumers/ReceivedHeartbeatConsumer.cs | 46 ------- .../Consumers/ReceivedLoginConsumer.cs | 47 ------- .../ScheduledMeterReadingConsumer.cs | 40 ------ .../EnergySystem/EnergySystemAppService.cs | 74 +++++------ .../JiShe.CollectBus.Application.csproj | 3 - .../Plugins/TcpMonitor.cs | 12 +- .../BasicScheduledMeterReadingService.cs | 3 +- ...nergySystemScheduledMeterReadingService.cs | 3 - .../Subscribers/SubscriberAppService.cs | 5 +- .../Subscribers/WorkerSubscriberAppService.cs | 3 +- .../CollectBusHostModule.Configure.cs | 120 ------------------ .../CollectBusHostModule.cs | 2 - .../JiShe.CollectBus.Host.csproj | 4 - web/JiShe.CollectBus.Host/appsettings.json | 38 ------ 22 files changed, 43 insertions(+), 518 deletions(-) delete mode 100644 services/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs delete mode 100644 services/JiShe.CollectBus.Application/Consumers/IssuedFaultConsumer.cs delete mode 100644 services/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs delete mode 100644 services/JiShe.CollectBus.Application/Consumers/ReceivedFaultConsumer.cs delete mode 100644 services/JiShe.CollectBus.Application/Consumers/ReceivedHeartbeatConsumer.cs delete mode 100644 services/JiShe.CollectBus.Application/Consumers/ReceivedLoginConsumer.cs delete mode 100644 services/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index bc066fe..e49d1be 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -10,8 +10,6 @@ using JiShe.CollectBus.Protocol.Contracts.AnalysisData; using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; -using MassTransit; -using DotNetCore.CAP; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Common.Consts; @@ -19,7 +17,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { public abstract class BaseProtocolPlugin : IProtocolPlugin { - private readonly ICapPublisher _producerBus; private readonly IProducerService _producerService; private readonly ILogger _logger; private readonly IRepository _protocolInfoRepository; @@ -41,7 +38,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts _logger = serviceProvider.GetRequiredService>(); _protocolInfoRepository = serviceProvider.GetRequiredService>(); _producerService = serviceProvider.GetRequiredService(); - _producerBus = serviceProvider.GetRequiredService(); } public abstract ProtocolInfo Info { get; } diff --git a/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj index cb60dbd..81469e8 100644 --- a/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj +++ b/protocols/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj @@ -13,8 +13,6 @@ - - diff --git a/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj b/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj index 899ebad..8b89e97 100644 --- a/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj +++ b/protocols/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj @@ -10,7 +10,6 @@ - diff --git a/protocols/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj b/protocols/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj index 1497183..3a7cc07 100644 --- a/protocols/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj +++ b/protocols/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj @@ -10,7 +10,6 @@ - diff --git a/services/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs b/services/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs deleted file mode 100644 index b1041f2..0000000 --- a/services/JiShe.CollectBus.Application/Consumers/IssuedConsumer.cs +++ /dev/null @@ -1,64 +0,0 @@ -using System; -using System.Threading.Tasks; -using JiShe.CollectBus.Common.Enums; -using JiShe.CollectBus.IotSystems.MessageIssueds; -using JiShe.CollectBus.IotSystems.MessageReceiveds; -using MassTransit; -using Microsoft.Extensions.Logging; -using TouchSocket.Sockets; -using Volo.Abp.Domain.Repositories; - -namespace JiShe.CollectBus.Consumers -{ - public class IssuedConsumer: IConsumer - { - private readonly ILogger _logger; - private readonly ITcpService _tcpService; - private readonly IRepository _messageReceivedLoginEventRepository; - private readonly IRepository _messageReceivedHeartbeatEventRepository; - - /// - /// IssuedConsumer - /// - /// - /// - /// - /// - public IssuedConsumer(ILogger logger, - ITcpService tcpService, - IRepository messageReceivedLoginEventRepository, - IRepository messageReceivedHeartbeatEventRepository) - { - _logger = logger; - _tcpService = tcpService; - _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; - _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; - } - - - public async Task Consume(ConsumeContext context) - { - switch (context.Message.Type) - { - case IssuedEventType.Heartbeat: - _logger.LogInformation($"IssuedEvent:{context.Message.MessageId}"); - var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == context.Message.MessageId); - heartbeatEntity.AckTime = DateTime.Now; - heartbeatEntity.IsAck = true; - await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); - break; - case IssuedEventType.Login: - var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == context.Message.MessageId); - loginEntity.AckTime = DateTime.Now; - loginEntity.IsAck = true; - await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); - break; - case IssuedEventType.Data: - break; - default: - throw new ArgumentOutOfRangeException(); - } - await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message); - } - } -} diff --git a/services/JiShe.CollectBus.Application/Consumers/IssuedFaultConsumer.cs b/services/JiShe.CollectBus.Application/Consumers/IssuedFaultConsumer.cs deleted file mode 100644 index 9bc9983..0000000 --- a/services/JiShe.CollectBus.Application/Consumers/IssuedFaultConsumer.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Threading.Tasks; -using JiShe.CollectBus.IotSystems.MessageIssueds; -using MassTransit; - -namespace JiShe.CollectBus.Consumers -{ - public class IssuedFaultConsumer : IConsumer> - { - public Task Consume(ConsumeContext> context) - { - throw new NotImplementedException(); - } - } -} diff --git a/services/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs b/services/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs deleted file mode 100644 index 4f7b5eb..0000000 --- a/services/JiShe.CollectBus.Application/Consumers/ReceivedConsumer.cs +++ /dev/null @@ -1,59 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using JiShe.CollectBus.IotSystems.MessageReceiveds; -using JiShe.CollectBus.Protocol.Contracts.Interfaces; -using JiShe.CollectBus.Protocol.Contracts.Models; -using MassTransit; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Volo.Abp.Domain.Repositories; - -namespace JiShe.CollectBus.Consumers -{ - /// - /// Batch 一次最多 100 个,最多 10 个并发批次 - /// - public class ReceivedConsumer: IConsumer> - { - private readonly ILogger _logger; - private readonly IServiceProvider _serviceProvider; - private readonly IRepository _messageReceivedEventRepository; - - /// - /// MessageReceivedConsumer - /// - /// - /// - /// - public ReceivedConsumer(ILogger logger, - IServiceProvider serviceProvider, - IRepository messageReceivedEventRepository) - { - _logger = logger; - _serviceProvider = serviceProvider; - _messageReceivedEventRepository = messageReceivedEventRepository; - } - - - public async Task Consume(ConsumeContext> context) - { - const string protocolType = "Standard"; - var protocolPlugin = _serviceProvider.GetKeyedService(protocolType); - if (protocolPlugin == null) - { - _logger.LogError("协议不存在!"); - } - else - { - var list = new List(); - foreach (var contextItem in context.Message) - { - await protocolPlugin.AnalyzeAsync(contextItem.Message); - list.Add(contextItem.Message); - } - await _messageReceivedEventRepository.InsertManyAsync(list); - } - } - } -} diff --git a/services/JiShe.CollectBus.Application/Consumers/ReceivedFaultConsumer.cs b/services/JiShe.CollectBus.Application/Consumers/ReceivedFaultConsumer.cs deleted file mode 100644 index 60bbdfc..0000000 --- a/services/JiShe.CollectBus.Application/Consumers/ReceivedFaultConsumer.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Threading.Tasks; -using JiShe.CollectBus.IotSystems.MessageReceiveds; -using MassTransit; - -namespace JiShe.CollectBus.Consumers -{ - public class ReceivedFaultConsumer: IConsumer>> - { - public Task Consume(ConsumeContext>> context) - { - throw new NotImplementedException(); - } - } -} diff --git a/services/JiShe.CollectBus.Application/Consumers/ReceivedHeartbeatConsumer.cs b/services/JiShe.CollectBus.Application/Consumers/ReceivedHeartbeatConsumer.cs deleted file mode 100644 index c2317c6..0000000 --- a/services/JiShe.CollectBus.Application/Consumers/ReceivedHeartbeatConsumer.cs +++ /dev/null @@ -1,46 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using JiShe.CollectBus.Common.Models; -using JiShe.CollectBus.IotSystems.MessageReceiveds; -using JiShe.CollectBus.Protocol.Contracts.Interfaces; -using MassTransit; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace JiShe.CollectBus.Consumers -{ - public class ReceivedHeartbeatConsumer:IConsumer - { - private readonly ILogger _logger; - private readonly IServiceProvider _serviceProvider; - - /// - /// - /// - /// - /// - public ReceivedHeartbeatConsumer(IServiceProvider serviceProvider, ILogger logger) - { - this._serviceProvider = serviceProvider; - this._logger = logger; - } - - public async Task Consume(ConsumeContext context) - { - _logger.LogInformation("心跳消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("Standard"); - if (protocolPlugin == null) - { - _logger.LogError("【心跳消费队列开始处理】协议不存在!"); - } - else - { - await protocolPlugin.HeartbeatAsync(context.Message); - _logger.LogInformation("心跳消费队列完成处理"); - } - } - } -} diff --git a/services/JiShe.CollectBus.Application/Consumers/ReceivedLoginConsumer.cs b/services/JiShe.CollectBus.Application/Consumers/ReceivedLoginConsumer.cs deleted file mode 100644 index ce67886..0000000 --- a/services/JiShe.CollectBus.Application/Consumers/ReceivedLoginConsumer.cs +++ /dev/null @@ -1,47 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using JiShe.CollectBus.Common.Models; -using JiShe.CollectBus.IotSystems.MessageReceiveds; -using JiShe.CollectBus.Protocol.Contracts.Interfaces; -using MassTransit; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace JiShe.CollectBus.Consumers -{ - public class ReceivedLoginConsumer : IConsumer - { - private readonly ILogger _logger; - private readonly IServiceProvider _serviceProvider; - - /// - /// - /// - /// - /// - public ReceivedLoginConsumer(ILogger logger, IServiceProvider serviceProvider) - { - _logger = logger; - _serviceProvider = serviceProvider; - } - - public async Task Consume(ConsumeContext context) - { - _logger.LogInformation("登录消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("Standard"); - if (protocolPlugin == null) - { - _logger.LogError("【登录消费队列开始处理】协议不存在!"); - } - else - { - await protocolPlugin.LoginAsync(context.Message); - _logger.LogInformation("登录消费队列完成处理"); - } - await Task.CompletedTask; - } - } -} diff --git a/services/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs b/services/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs deleted file mode 100644 index cdc731a..0000000 --- a/services/JiShe.CollectBus.Application/Consumers/ScheduledMeterReadingConsumer.cs +++ /dev/null @@ -1,40 +0,0 @@ -using System; -using System.Threading.Tasks; -using JiShe.CollectBus.Common.Enums; -using JiShe.CollectBus.Common.Models; -using JiShe.CollectBus.IotSystems.MessageIssueds; -using MassTransit; -using Microsoft.Extensions.Logging; -using TouchSocket.Sockets; -using Volo.Abp.Domain.Repositories; - -namespace JiShe.CollectBus.Consumers -{ - /// - /// 定时抄读任务消费者 - /// - public class ScheduledMeterReadingConsumer : IConsumer - { - private readonly ILogger _logger; - private readonly ITcpService _tcpService; - - /// - /// WorkerConsumer - /// - /// - /// - public ScheduledMeterReadingConsumer(ILogger logger, - ITcpService tcpService) - { - _logger = logger; - _tcpService = tcpService; - } - - - public async Task Consume(ConsumeContext context) - { - _logger.LogError($"{nameof(ScheduledMeterReadingConsumer)} 集中器的消息消费{context.Message.FocusAddress}"); - await _tcpService.SendAsync(context.Message.FocusAddress, context.Message.MessageHexString); - } - } -} diff --git a/services/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs b/services/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs index 9fc8dd4..19a1296 100644 --- a/services/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs +++ b/services/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs @@ -5,7 +5,6 @@ using System.Net; using System.Text; using System.Threading.Tasks; using DeviceDetectorNET.Class.Device; -using DotNetCore.CAP; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; @@ -18,7 +17,6 @@ using JiShe.CollectBus.IotSystems.PrepayModel; using JiShe.CollectBus.IotSystems.Records; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; -using MassTransit; using Microsoft.AspNetCore.Mvc; using Newtonsoft.Json; using Volo.Abp.Domain.Repositories; @@ -32,16 +30,14 @@ namespace JiShe.CollectBus.EnergySystem private readonly IRepository _csqRecordRepository; private readonly IRepository _conrOnlineRecordRepository; private readonly IProducerService _producerService; - private readonly ICapPublisher _capBus; public EnergySystemAppService(IRepository focusRecordRepository, IRepository csqRecordRepository, - IRepository conrOnlineRecordRepository, IProducerService producerService, ICapPublisher capBus) + IRepository conrOnlineRecordRepository, IProducerService producerService) { _focusRecordRepository = focusRecordRepository; _csqRecordRepository = csqRecordRepository; _conrOnlineRecordRepository = conrOnlineRecordRepository; _producerService = producerService; - _capBus = capBus; } /// @@ -81,7 +77,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage @@ -90,7 +86,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); result.Status = true; result.Msg = "操作成功"; @@ -127,7 +123,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage @@ -136,7 +132,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); } @@ -178,7 +174,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -186,7 +182,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); result.Status = true; result.Msg = "操作成功"; @@ -216,7 +212,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -224,7 +220,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); } @@ -307,7 +303,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage @@ -316,7 +312,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); if (isManual) @@ -375,7 +371,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage @@ -384,7 +380,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); result.Status = true; @@ -412,7 +408,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -420,7 +416,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); result.Status = true; @@ -448,7 +444,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -456,7 +452,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); result.Status = true; @@ -483,7 +479,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -491,7 +487,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); result.Status = true; @@ -519,7 +515,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -527,7 +523,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); result.Status = true; result.Msg = "操作成功"; @@ -576,7 +572,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage @@ -585,7 +581,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); } @@ -654,7 +650,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -662,7 +658,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); } result.Status = true; @@ -691,7 +687,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -699,7 +695,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); } result.Status = true; @@ -727,7 +723,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -735,7 +731,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); } result.Status = true; @@ -761,7 +757,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -769,7 +765,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); result.Status = true; result.Msg = "操作成功"; @@ -796,7 +792,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -804,7 +800,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); } result.Status = true; @@ -859,7 +855,7 @@ namespace JiShe.CollectBus.EnergySystem // DeviceNo = address, // Message = bytes, // Type = IssuedEventType.Data, - // MessageId = NewId.NextGuid().ToString() + // MessageId = Guid.NewGuid().ToString() //}); await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { @@ -867,7 +863,7 @@ namespace JiShe.CollectBus.EnergySystem DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); } diff --git a/services/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/services/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index b473dea..b97fd0f 100644 --- a/services/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/services/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -15,9 +15,6 @@ - - - diff --git a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index c2bd026..5a99bf1 100644 --- a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -3,7 +3,6 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; -using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; @@ -15,7 +14,6 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; -using MassTransit; using Microsoft.Extensions.Logging; using TouchSocket.Core; using TouchSocket.Sockets; @@ -29,7 +27,6 @@ namespace JiShe.CollectBus.Plugins { public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin { - private readonly ICapPublisher _producerBus; private readonly IProducerService _producerService; private readonly ILogger _logger; private readonly IRepository _deviceRepository; @@ -42,12 +39,11 @@ namespace JiShe.CollectBus.Plugins /// /// /// - public TcpMonitor(ICapPublisher producerBus, IProducerService producerService, + public TcpMonitor(IProducerService producerService, ILogger logger, IRepository deviceRepository, IDistributedCache ammeterInfoCache) { - _producerBus = producerBus; _producerService = producerService; _logger = logger; _deviceRepository = deviceRepository; @@ -173,7 +169,7 @@ namespace JiShe.CollectBus.Plugins ClientPort = client.Port, MessageHexString = messageHexString, DeviceNo = deviceNo, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }; //await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent); @@ -224,7 +220,7 @@ namespace JiShe.CollectBus.Plugins ClientPort = client.Port, MessageHexString = messageHexString, DeviceNo = deviceNo, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }; //await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent); @@ -271,7 +267,7 @@ namespace JiShe.CollectBus.Plugins ClientPort = client.Port, MessageHexString = messageHexString, DeviceNo = deviceNo, - MessageId = NewId.NextGuid().ToString() + MessageId = Guid.NewGuid().ToString() }); } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 42f0f1c..18f6780 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,5 +1,4 @@ -using DotNetCore.CAP; -using JiShe.CollectBus.Ammeters; +using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 733f14a..dfbcd26 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -1,8 +1,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; -using Confluent.Kafka; -using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.Consts; @@ -19,7 +17,6 @@ using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository.MeterReadingRecord; -using MassTransit; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Configuration; diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index eb3abd9..4236855 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -1,5 +1,4 @@ -using DotNetCore.CAP; -using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; @@ -23,7 +22,7 @@ using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Subscribers { - public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe, IKafkaSubscribe + public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 65ecc01..b3102d0 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; -using DotNetCore.CAP; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.IotSystems.Devices; @@ -27,7 +26,7 @@ namespace JiShe.CollectBus.Subscribers /// 定时抄读任务消息消费订阅 /// [Route($"/worker/app/subscriber")] - public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, ICapSubscribe, IKafkaSubscribe + public class WorkerSubscriberAppService : CollectBusAppService, IWorkerSubscriberAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; diff --git a/web/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/web/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 82cc214..517d5fe 100644 --- a/web/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/web/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -3,7 +3,6 @@ using Hangfire; using Hangfire.Redis.StackExchange; using JiShe.CollectBus.Host.Hangfire; using JiShe.CollectBus.Host.Swaggers; -using MassTransit; using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.DataProtection; using Microsoft.IdentityModel.Tokens; @@ -17,14 +16,6 @@ using Volo.Abp.Modularity; using TouchSocket.Core; using TouchSocket.Sockets; using JiShe.CollectBus.Plugins; -using JiShe.CollectBus.Consumers; -using JiShe.CollectBus.Protocol.Contracts; -using JiShe.CollectBus.IotSystems.MessageReceiveds; -using JiShe.CollectBus.IotSystems.MessageIssueds; -using Confluent.Kafka; -using MassTransit.SqlTransport.Topology; -using Confluent.Kafka.Admin; -using JiShe.CollectBus.Common.Consts; namespace JiShe.CollectBus.Host @@ -250,116 +241,5 @@ namespace JiShe.CollectBus.Host .SetUdpDataHandlingAdapter(() => new NormalUdpDataHandlingAdapter()); }); } - - /// - /// Configures the cap. - /// - /// The context. - /// The configuration. - public void ConfigureCap(ServiceConfigurationContext context, IConfiguration configuration) - { - context.Services.AddCap(x => - { - x.DefaultGroupName = ProtocolConst.SubscriberGroup; - var connectionStr = configuration.GetConnectionString(CollectBusDbProperties.MongoDbConnectionStringName); - x.UseMongoDB(connectionStr); //MongoDB 4.0+ cluster - var kafka = configuration.GetConnectionString("Kafka"); - x.UseKafka(option => - { - option.Servers = kafka; - if (!Convert.ToBoolean(configuration["Kafka:EnableAuthorization"])) return; - option.MainConfig.Add("security.protocol", configuration["Kafka:SecurityProtocol"]); - option.MainConfig.Add("sasl.mechanism", configuration["Kafka:SaslMechanism"]); - option.MainConfig.Add("sasl.username", configuration["Kafka:SaslUserName"]); - option.MainConfig.Add("sasl.password", configuration["Kafka:SaslPassword"]); - }); - - x.UseDashboard(); - x.FailedRetryInterval = 10; - x.FailedRetryCount = 5; - }); - - } - - /// - /// Configures the mass transit. - /// - /// The context. - /// The configuration. - /// - /// Configures the mass transit. - /// - public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration) - { - var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup }; - var producerConfig = new ProducerConfig(); - - context.Services - .AddMassTransit(x => - { - x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context)); - - x.AddConfigureEndpointsCallback((c, name, cfg) => - { - cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); - cfg.UseMessageRetry(r => r.Immediate(5)); - cfg.UseInMemoryOutbox(c); - }); - - x.AddRider(rider => - { - rider.AddConsumer(); - rider.AddConsumer(); - rider.AddConsumer(); - rider.AddConsumer(cfg => - { - cfg.Options(options => options - .SetMessageLimit(100) - .SetTimeLimit(s: 1) - .SetTimeLimitStart(BatchTimeLimitStart.FromLast) - .SetConcurrencyLimit(10)); - }); - rider.AddConsumer(); - - rider.AddProducer(ProtocolConst.SubscriberLoginReceivedEventName); - rider.AddProducer(ProtocolConst.SubscriberHeartbeatReceivedEventName); - - rider.UsingKafka((c, cfg) => - { - cfg.Host(configuration.GetConnectionString("Kafka")); - - cfg.TopicEndpoint(ProtocolConst.SubscriberHeartbeatReceivedEventName, consumerConfig, configurator => - { - configurator.AutoOffsetReset = AutoOffsetReset.Earliest; - configurator.ConfigureConsumer(c); - }); - - cfg.TopicEndpoint(ProtocolConst.SubscriberLoginReceivedEventName, consumerConfig, configurator => - { - configurator.ConfigureConsumer(c); - configurator.AutoOffsetReset = AutoOffsetReset.Earliest; - }); - - cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator => - { - configurator.ConfigureConsumer(c); - configurator.AutoOffsetReset = AutoOffsetReset.Earliest; - }); - - cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator => - { - configurator.ConfigureConsumer(c); - configurator.AutoOffsetReset = AutoOffsetReset.Earliest; - }); - - cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, consumerConfig, configurator => - { - configurator.ConfigureConsumer(c); - configurator.AutoOffsetReset = AutoOffsetReset.Earliest; - }); - }); - }); - }); - } } } \ No newline at end of file diff --git a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs index aabc2ba..95e93c7 100644 --- a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -43,8 +43,6 @@ namespace JiShe.CollectBus.Host ConfigureNetwork(context, configuration); ConfigureJwtAuthentication(context, configuration); ConfigureHangfire(context); - ConfigureCap(context, configuration); - //ConfigureMassTransit(context, configuration); //ConfigureKafkaTopic(context, configuration); ConfigureAuditLog(context); ConfigureCustom(context, configuration); diff --git a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj index 76dfe60..3e60600 100644 --- a/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj +++ b/web/JiShe.CollectBus.Host/JiShe.CollectBus.Host.csproj @@ -20,10 +20,6 @@ - - - - diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index 7cf18d7..f776202 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -74,14 +74,6 @@ "Version": "V1" } ], - "Cap": { - "RabbitMq": { - "HostName": "118.190.144.92", - "UserName": "collectbus", - "Password": "123456", - "Port": 5672 - } - }, "Kafka": { "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "EnableFilter": true, @@ -93,37 +85,7 @@ "KafkaReplicationFactor": 3, "NumPartitions": 30, "ServerTagName": "JiSheCollectBus2" - //"Topic": { - // "ReplicationFactor": 3, - // "NumPartitions": 1000 - //} }, - //"Kafka": { - // "Connections": { - // "Default": { - // "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092" - // // "SecurityProtocol": "SASL_PLAINTEXT", - // // "SaslMechanism": "PLAIN", - // // "SaslUserName": "lixiao", - // // "SaslPassword": "lixiao1980", - // } - // }, - // "Consumer": { - // "GroupId": "JiShe.CollectBus" - // }, - // "Producer": { - // "MessageTimeoutMs": 6000, - // "Acks": -1 - // }, - // "Topic": { - // "ReplicationFactor": 3, - // "NumPartitions": 1000 - // }, - // "EventBus": { - // "GroupId": "JiShe.CollectBus", - // "TopicName": "DefaultTopicName" - // } - //}, "IoTDBOptions": { "UserName": "root", "Password": "root",