From 07211eed8b726272879e7e0c9f6e21c21f1ddbf1 Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Mon, 28 Oct 2024 16:23:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Interfaces/IJiSheModule.cs | 11 ++++ .../JiShe.CollectBus.Common.csproj | 2 + .../DependencyInjectionExtensions.cs | 42 ++++++++------ .../ServiceCollectionExtensions.cs | 7 --- JiShe.CollectBus.Console/Program.cs | 4 +- JiShe.CollectBus.Console/appsettings.json | 4 +- .../JiShe.CollectBus.Consumer.csproj | 10 ++++ JiShe.CollectBus.Consumer/Program.cs | 2 + .../Plugins/TcpServiceReceivedPlugin.cs | 23 +++----- .../Abstracts/BaseProtocolPlugin.cs | 33 +---------- .../Interfaces/IProtocolPlugin.cs | 6 +- .../Models/MessageIssuedEvent.cs | 9 +++ .../Models/MessageReceivedEvent.cs | 11 +--- .../TestProtocolPlugin.cs | 9 +-- .../StandardProtocolPlugin.cs | 30 ++++------ .../Consumers/MessageIssuedConsumer.cs | 26 +++++++++ .../Consumers/MessageReceivedConsumer.cs | 38 +++++++++++++ .../Consumers/PushConsumer.cs | 13 ----- .../Consumers/ReportConsumer.cs | 13 ----- .../ServiceCollectionExtensions.cs | 55 ------------------- .../JiShe.CollectBus.RabbitMQ.csproj | 1 + .../JiSheCollectBusRabbitMqModule.cs | 45 +++++++++++++++ JiShe.CollectBus.RabbitMQ/Models/PushDto.cs | 15 ----- .../Senders/IMqSender.cs | 18 ------ JiShe.CollectBus.RabbitMQ/Senders/INSender.cs | 18 ++++++ JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs | 52 ------------------ JiShe.CollectBus.RabbitMQ/Senders/NSender.cs | 53 ++++++++++++++++++ 27 files changed, 273 insertions(+), 277 deletions(-) create mode 100644 JiShe.CollectBus.Common/Interfaces/IJiSheModule.cs create mode 100644 JiShe.CollectBus.Consumer/JiShe.CollectBus.Consumer.csproj create mode 100644 JiShe.CollectBus.Consumer/Program.cs create mode 100644 JiShe.CollectBus.Protocol.Contracts/Models/MessageIssuedEvent.cs rename JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs => JiShe.CollectBus.Protocol.Contracts/Models/MessageReceivedEvent.cs (72%) create mode 100644 JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs create mode 100644 JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedConsumer.cs delete mode 100644 JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs delete mode 100644 JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs delete mode 100644 JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs create mode 100644 JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs delete mode 100644 JiShe.CollectBus.RabbitMQ/Models/PushDto.cs delete mode 100644 JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs create mode 100644 JiShe.CollectBus.RabbitMQ/Senders/INSender.cs delete mode 100644 JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs create mode 100644 JiShe.CollectBus.RabbitMQ/Senders/NSender.cs diff --git a/JiShe.CollectBus.Common/Interfaces/IJiSheModule.cs b/JiShe.CollectBus.Common/Interfaces/IJiSheModule.cs new file mode 100644 index 0000000..62d3cdb --- /dev/null +++ b/JiShe.CollectBus.Common/Interfaces/IJiSheModule.cs @@ -0,0 +1,11 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace JiShe.CollectBus.Common.Interfaces +{ + public interface IJiSheModule + { + void ConfigureServices(IServiceCollection services,HostBuilderContext hostContext); + } +} diff --git a/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj b/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj index 4dbdb38..b6ff8a7 100644 --- a/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj +++ b/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj @@ -8,7 +8,9 @@ + + diff --git a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs index 6c2eacb..96b7cfd 100644 --- a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs +++ b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs @@ -1,14 +1,35 @@ using JiShe.CollectBus.Common.Extensions.DependencyInjections; using JiShe.CollectBus.Protocol.Contracts.Attributes; -using System.Diagnostics.CodeAnalysis; using System.Reflection; +using JiShe.CollectBus.Common.Interfaces; using TouchSocket.Core; +using Microsoft.Extensions.Hosting; +using System; // ReSharper disable once CheckNamespace namespace Microsoft.Extensions.DependencyInjection { public static class DependencyInjectionExtensions { + public static void ModuleRegister(this IServiceCollection services, HostBuilderContext hostContext) + { + var assemblies = GetBinAssemblies(); + + foreach (var assembly in assemblies) + { + var allTypes = assembly.GetTypes(); + foreach (var type in allTypes) + { + if (typeof(IJiSheModule).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) + { + + var instance = Activator.CreateInstance(type); + _ = (type.GetMethod("ConfigureServices")?.Invoke(instance, [services, hostContext])); + } + } + } + } + public static void ServiceRegister(this IServiceCollection services) { var assemblies = GetBinAssemblies(); @@ -18,7 +39,8 @@ namespace Microsoft.Extensions.DependencyInjection var allTypes = assembly.GetTypes(); foreach (var type in allTypes) { - if (typeof(ISingletonDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) + if (type is not { IsClass: true, IsAbstract: false }) continue; + if (typeof(ISingletonDependency).IsAssignableFrom(type)) { var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency")); foreach (var interfaceType in interfaceTypes) @@ -26,11 +48,8 @@ namespace Microsoft.Extensions.DependencyInjection services.AddSingleton(interfaceType, type); } } - } - foreach (var type in allTypes) - { - if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) + if (typeof(ITransientDependency).IsAssignableFrom(type)) { var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency")); foreach (var interfaceType in interfaceTypes) @@ -38,11 +57,8 @@ namespace Microsoft.Extensions.DependencyInjection services.AddTransient(interfaceType, type); } } - } - foreach (var type in allTypes) - { - if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) + if (typeof(IScopedDependency).IsAssignableFrom(type)) { var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency")); foreach (var interfaceType in interfaceTypes) @@ -78,10 +94,7 @@ namespace Microsoft.Extensions.DependencyInjection services.Add(serviceDescriptor); } } - } - foreach (var type in allTypes) - { if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) { var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency")); @@ -93,10 +106,7 @@ namespace Microsoft.Extensions.DependencyInjection services.Add(serviceDescriptor); } } - } - foreach (var type in allTypes) - { if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) { var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency")); diff --git a/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs b/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs index 7264af8..fd52635 100644 --- a/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs +++ b/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs @@ -19,13 +19,6 @@ namespace Microsoft.Extensions.DependencyInjection config.SetListenIPHosts(int.Parse(configuration["TCP:Port"] ?? "10500")) .ConfigureContainer(a => //容器的配置顺序应该在最前面 { - - //a.AddFileLogger(fileLogger => - //{ - // fileLogger.MaxSize = 1024 * 1024; - // fileLogger.LogLevel = LogLevel.Debug; - // fileLogger.CreateLogFolder = level => $"logs\\{DateTime.Now:yyyy-MM-dd}\\{level}"; - //}); a.AddConsoleLogger(); }) .ConfigurePlugins(a => diff --git a/JiShe.CollectBus.Console/Program.cs b/JiShe.CollectBus.Console/Program.cs index 61e58ff..fcaad3a 100644 --- a/JiShe.CollectBus.Console/Program.cs +++ b/JiShe.CollectBus.Console/Program.cs @@ -29,11 +29,11 @@ namespace JiShe.CollectBus.Console private static void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext) { var configuration = hostContext.Configuration; + services.ModuleRegister(hostContext); services.ServiceRegister(); services.PluginServiceRegister(); services.AddTcp(configuration); - services.AddUdp(configuration); - services.AddMassTransit(configuration); + //services.AddUdp(configuration); services.AddStackExchangeRedisCache(options => { options.Configuration = configuration["RedisCache:ConnectionString"]; diff --git a/JiShe.CollectBus.Console/appsettings.json b/JiShe.CollectBus.Console/appsettings.json index ea483c5..4d74af7 100644 --- a/JiShe.CollectBus.Console/appsettings.json +++ b/JiShe.CollectBus.Console/appsettings.json @@ -21,8 +21,8 @@ "UserName": "collectbus", "Password": "123456", "Queue": { - "Push": "PUSH_COMMANDS", - "Report": "REPORT_COMMAND" + "Received": "Received_Command", + "Issued": "Issued_Command" } } } \ No newline at end of file diff --git a/JiShe.CollectBus.Consumer/JiShe.CollectBus.Consumer.csproj b/JiShe.CollectBus.Consumer/JiShe.CollectBus.Consumer.csproj new file mode 100644 index 0000000..2150e37 --- /dev/null +++ b/JiShe.CollectBus.Consumer/JiShe.CollectBus.Consumer.csproj @@ -0,0 +1,10 @@ + + + + Exe + net8.0 + enable + enable + + + diff --git a/JiShe.CollectBus.Consumer/Program.cs b/JiShe.CollectBus.Consumer/Program.cs new file mode 100644 index 0000000..3751555 --- /dev/null +++ b/JiShe.CollectBus.Consumer/Program.cs @@ -0,0 +1,2 @@ +// See https://aka.ms/new-console-template for more information +Console.WriteLine("Hello, World!"); diff --git a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs index 3bae441..1b463e4 100644 --- a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs +++ b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs @@ -1,24 +1,20 @@ -using JiShe.CollectBus.Protocol.Contracts.Interfaces; -using JiShe.CollectBus.RabbitMQ.Models; -using JiShe.CollectBus.RabbitMQ.Senders; -using Microsoft.Extensions.DependencyInjection; -using System; +using JiShe.CollectBus.Protocol.Contracts.Models; using TouchSocket.Core; using TouchSocket.Sockets; +using JiShe.CollectBus.RabbitMQ.Senders; namespace JiShe.CollectBus.Core.Plugins { public partial class TcpServiceReceivedPlugin : PluginBase { - private readonly IServiceProvider _serviceProvider; - private readonly IMqSender _mqSender; + private readonly INSender _nSender; - public TcpServiceReceivedPlugin(IServiceProvider serviceProvider, IMqSender mqSender) + public TcpServiceReceivedPlugin(INSender nSender) { - _serviceProvider = serviceProvider; - _mqSender = mqSender; + _nSender = nSender; } + [GeneratorPlugin(typeof(ITcpReceivedPlugin))] public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e) { @@ -36,16 +32,13 @@ namespace JiShe.CollectBus.Core.Plugins //client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}"); //await protocolPlugin.ReceivedAsync(client,e); - - await _mqSender.SendToReportAsync(new ReportDto + await _nSender.SendToReceivedAsync(new MessageReceivedEvent { ClientIP = client.IP, ClientId = client.Id, - Port = client.Port, MessageHexString = Convert.ToHexString(e.ByteBlock.Span), - DeviceNo = "111" + Port = client.Port }); - await e.InvokeNext(); } diff --git a/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 383d308..8a700d9 100644 --- a/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -1,40 +1,13 @@ -using System; -using System.Threading.Tasks; +using System.Threading.Tasks; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; -using Microsoft.Extensions.Caching.Distributed; -using TouchSocket.Sockets; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { - public abstract class BaseProtocolPlugin(IDistributedCache cache) : IProtocolPlugin + public abstract class BaseProtocolPlugin : IProtocolPlugin { public abstract Task GetAsync(); - public virtual Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e) - { - return Task.CompletedTask; - } - - public virtual Task SendAsync() - { - return Task.CompletedTask; - } - - public virtual async Task LoadAsync() - { - var result = cache.GetString("myKey"); - if (result == null) - { - result = "Calculated Data"; - cache.SetString("myKey", result, new DistributedCacheEntryOptions - { - AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(10) - }); - } - - await Task.CompletedTask; - } - + public abstract Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent); } } diff --git a/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs index 1c61344..6eadf7e 100644 --- a/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs @@ -8,10 +8,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces { Task GetAsync(); - Task LoadAsync(); - - Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e); - - Task SendAsync(); + Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent); } } diff --git a/JiShe.CollectBus.Protocol.Contracts/Models/MessageIssuedEvent.cs b/JiShe.CollectBus.Protocol.Contracts/Models/MessageIssuedEvent.cs new file mode 100644 index 0000000..2ba9153 --- /dev/null +++ b/JiShe.CollectBus.Protocol.Contracts/Models/MessageIssuedEvent.cs @@ -0,0 +1,9 @@ +namespace JiShe.CollectBus.Protocol.Contracts.Models +{ + public class MessageIssuedEvent + { + public string ClientId { get; set; } + public byte[] Message { get; set; } + public string DeviceNo { get; set; } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs b/JiShe.CollectBus.Protocol.Contracts/Models/MessageReceivedEvent.cs similarity index 72% rename from JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs rename to JiShe.CollectBus.Protocol.Contracts/Models/MessageReceivedEvent.cs index 3643354..87e093d 100644 --- a/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs +++ b/JiShe.CollectBus.Protocol.Contracts/Models/MessageReceivedEvent.cs @@ -1,13 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Text.Json.Serialization; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.RabbitMQ.Models +namespace JiShe.CollectBus.Protocol.Contracts.Models { - public class ReportDto + public class MessageReceivedEvent { /// /// 客服端标识 diff --git a/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs index a6f3b54..13d6c87 100644 --- a/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs @@ -8,19 +8,14 @@ using TouchSocket.Sockets; namespace JiShe.CollectBus.Protocol.Test { [ProtocolName("TestProtocol")] - public class TestProtocolPlugin(IDistributedCache cache) : BaseProtocolPlugin(cache), ISingletonDependency + public class TestProtocolPlugin : BaseProtocolPlugin, ISingletonDependency { public override Task GetAsync() { throw new NotImplementedException(); } - public override Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e) - { - throw new NotImplementedException(); - } - - public override Task SendAsync() + public override Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent) { throw new NotImplementedException(); } diff --git a/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 1aae7c0..ad49e01 100644 --- a/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -3,13 +3,14 @@ using JiShe.CollectBus.Common.Extensions.DependencyInjections; using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocol.Contracts.Attributes; using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.RabbitMQ.Senders; using Microsoft.Extensions.Caching.Distributed; using TouchSocket.Sockets; namespace JiShe.CollectBus.Protocol { [ProtocolName("StandardProtocol")] - public class StandardProtocolPlugin(IDistributedCache cache) : BaseProtocolPlugin(cache), ISingletonDependency + public class StandardProtocolPlugin(INSender nSender) : BaseProtocolPlugin, ISingletonDependency { //起始字符 private const string stx = "68"; @@ -26,7 +27,7 @@ namespace JiShe.CollectBus.Protocol static List MSA = new List(); static Dictionary> usingMSA = new Dictionary>(); - private ITcpSessionClient tcpSessionClient; + private string clientId = ""; static StandardProtocolPlugin() { @@ -43,16 +44,11 @@ namespace JiShe.CollectBus.Protocol return await Task.FromResult(info); } - public new async Task LoadAsync() + public override async Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent) { - await base.LoadAsync(); - } + clientId = messageReceivedEvent.ClientId; - public override async Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e) - { - tcpSessionClient = client; - var messageHexString = Convert.ToHexString(e.ByteBlock.Span); - var cmdResult = AnalysisCmd(messageHexString); + var cmdResult = AnalysisCmd(messageReceivedEvent.MessageHexString); if (cmdResult == null) { return; @@ -61,11 +57,6 @@ namespace JiShe.CollectBus.Protocol await Task.CompletedTask; } - public override async Task SendAsync() - { - await Task.CompletedTask; - } - /// /// Gets the msa. /// @@ -294,9 +285,12 @@ namespace JiShe.CollectBus.Protocol Fn = 1 }; commandReulst.ReplyBytes = GetCommandBytes(reqParam); - tcpSessionClient.SendAsync(tcpSessionClient.Id,commandReulst.ReplyBytes); - - + nSender.SendToIssuedAsync(new MessageIssuedEvent + { + ClientId = clientId, + DeviceNo = "", + Message = commandReulst.ReplyBytes + }); } else if (commandReulst.Fn == 2)//退出登录 { diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs new file mode 100644 index 0000000..f78e267 --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs @@ -0,0 +1,26 @@ +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.Protocol.Contracts.Models; +using MassTransit; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using TouchSocket.Sockets; + +namespace JiShe.CollectBus.RabbitMQ.Consumers +{ + public class MessageIssuedConsumer : IConsumer + { + private readonly ILogger _logger; + private readonly ITcpService _tcpService; + + public MessageIssuedConsumer(ILogger logger, ITcpService tcpService) + { + _logger = logger; + _tcpService = tcpService; + } + + public async Task Consume(ConsumeContext context) + { + await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message); + } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedConsumer.cs new file mode 100644 index 0000000..85733a7 --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedConsumer.cs @@ -0,0 +1,38 @@ +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using MassTransit; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Net.Sockets; +using JiShe.CollectBus.Protocol.Contracts.Models; +using TouchSocket.Sockets; + +namespace JiShe.CollectBus.RabbitMQ.Consumers +{ + public class MessageReceivedConsumer : IConsumer + { + + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + + public MessageReceivedConsumer(ILogger logger, IServiceProvider serviceProvider) + { + _logger = logger; + _serviceProvider = serviceProvider; + } + + public async Task Consume(ConsumeContext context) + { + const string protocolType = "StandardProtocol"; + var protocolPlugin = _serviceProvider.GetKeyedService(protocolType); + if (protocolPlugin == null) + { + _logger.LogError("协议不存在!"); + } + else + { + await protocolPlugin.AnalyzeAsync(context.Message); + } + } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs deleted file mode 100644 index 481b54b..0000000 --- a/JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs +++ /dev/null @@ -1,13 +0,0 @@ -using MassTransit; -using JiShe.CollectBus.RabbitMQ.Models; - -namespace JiShe.CollectBus.RabbitMQ.Consumers -{ - public class PushConsumer : IConsumer - { - public async Task Consume(ConsumeContext context) - { - Console.WriteLine(context.Message.ClientId); - } - } -} diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs deleted file mode 100644 index 471d8d5..0000000 --- a/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs +++ /dev/null @@ -1,13 +0,0 @@ -using JiShe.CollectBus.RabbitMQ.Models; -using MassTransit; - -namespace JiShe.CollectBus.RabbitMQ.Consumers -{ - public class ReportConsumer : IConsumer - { - public async Task Consume(ConsumeContext context) - { - Console.WriteLine(context.Message.ClientId); - } - } -} diff --git a/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs b/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs deleted file mode 100644 index 7234f8c..0000000 --- a/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs +++ /dev/null @@ -1,55 +0,0 @@ -using JiShe.CollectBus.RabbitMQ.Consumers; -using MassTransit; -using Microsoft.Extensions.Configuration; -using RabbitMQ.Client; - -// ReSharper disable once CheckNamespace -namespace Microsoft.Extensions.DependencyInjection -{ - public static class ServiceCollectionExtensions - { - public static IServiceCollection AddMassTransit(this IServiceCollection services, IConfiguration configuration) - { - services.AddMassTransit(x => - { - x.UsingRabbitMq((context, cfg) => - { - cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h => - { - h.Username(configuration["MQ:UserName"] ?? string.Empty); - h.Password(configuration["MQ:Password"] ?? string.Empty); - }); - - cfg.ReceiveEndpoint(configuration["MQ:Queue:Push"] ?? string.Empty, x => - { - x.ConfigureConsumeTopology = false; - - x.Consumer(); - - x.Bind("commands", s => - { - s.RoutingKey = configuration["MQ:Queue:Push"]; - s.ExchangeType = ExchangeType.Direct; - }); - }); - - cfg.ReceiveEndpoint(configuration["MQ:Queue:Report"] ?? string.Empty, x => - { - x.ConfigureConsumeTopology = false; - - x.Consumer(); - - x.Bind("commands", s => - { - s.RoutingKey = configuration["MQ:Queue:Report"]; - s.ExchangeType = ExchangeType.Direct; - }); - }); - //cfg.UseRawJsonSerializer(); - }); - }); - services.AddMassTransitHostedService(); - return services; - } - } -} diff --git a/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj index f676e4e..254eb41 100644 --- a/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj +++ b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj @@ -15,6 +15,7 @@ + diff --git a/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs b/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs new file mode 100644 index 0000000..eaeed17 --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs @@ -0,0 +1,45 @@ +using System; +using JiShe.CollectBus.Common.Interfaces; +using JiShe.CollectBus.RabbitMQ.Consumers; +using MassTransit; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using RabbitMQ.Client; + +namespace JiShe.CollectBus.RabbitMQ +{ + public class JiSheCollectBusRabbitMqModule: IJiSheModule + { + public void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext) + { + var configuration = hostContext.Configuration; + services.AddMassTransit(x => + { + x.AddConsumer(); + x.AddConsumer(); + x.UsingRabbitMq((context, cfg) => + { + cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h => + { + h.Username(configuration["MQ:UserName"] ?? string.Empty); + h.Password(configuration["MQ:Password"] ?? string.Empty); + }); + + // 消息接收队列 + cfg.ReceiveEndpoint(configuration["MQ:Queue:Received"] ?? string.Empty,configurator => + { + configurator.ConfigureConsumeTopology = false; + configurator.Consumer(context); + }); + + // 消息下发队列 + cfg.ReceiveEndpoint(configuration["MQ:Queue:Issued"] ?? string.Empty, configurator => + { + configurator.ConfigureConsumeTopology = false; + configurator.Consumer(context); + }); + }); + }); + } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Models/PushDto.cs b/JiShe.CollectBus.RabbitMQ/Models/PushDto.cs deleted file mode 100644 index d203020..0000000 --- a/JiShe.CollectBus.RabbitMQ/Models/PushDto.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.RabbitMQ.Models -{ - public class PushDto - { - public int ClientId { get; set; } - public string Message { get; set; } - public string DeviceNo { get; set; } - } -} diff --git a/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs deleted file mode 100644 index 3e38e0c..0000000 --- a/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using JiShe.CollectBus.Common.Extensions.DependencyInjections; - -namespace JiShe.CollectBus.RabbitMQ.Senders -{ - public interface IMqSender - { - Task SendToPushAsync(T message, CancellationToken cancellationToken = default) where T : class; - Task SendToPushAsync(object message, CancellationToken cancellationToken = default); - Task SendToReportAsync(T message, CancellationToken cancellationToken = default) where T : class; - Task SendToReportAsync(object message, CancellationToken cancellationToken = default); - Task SendAsync(string queueKey, object message, CancellationToken cancellationToken = default); - } -} diff --git a/JiShe.CollectBus.RabbitMQ/Senders/INSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/INSender.cs new file mode 100644 index 0000000..0f5c8fc --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Senders/INSender.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.RabbitMQ.Senders +{ + public interface INSender + { + Task SendToIssuedAsync(T message, CancellationToken cancellationToken = default) where T : class; + Task SendToIssuedAsync(object message, CancellationToken cancellationToken = default); + Task SendToReceivedAsync(T message, CancellationToken cancellationToken = default) where T : class; + Task SendToReceivedAsync(object message, CancellationToken cancellationToken = default); + Task SendAsync(string queueKey, object message, CancellationToken cancellationToken = default); + + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs deleted file mode 100644 index b4249c6..0000000 --- a/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs +++ /dev/null @@ -1,52 +0,0 @@ -using JiShe.CollectBus.Common.Extensions.DependencyInjections; -using MassTransit; -using Microsoft.Extensions.Configuration; - -namespace JiShe.CollectBus.RabbitMQ.Senders -{ - public class MqSender : IMqSender, ISingletonDependency - { - private readonly ISendEndpointProvider _sendEndpointProvider; - private readonly IConfiguration _configuration; - - public MqSender(ISendEndpointProvider sendEndpointProvider, IConfiguration configuration) - { - _sendEndpointProvider = sendEndpointProvider; - _configuration = configuration; - } - - public async Task SendToPushAsync(T message, CancellationToken cancellationToken = default) where T : class - { - var queueKey = _configuration["MQ:Queue:Push"]; - await SendAsync(queueKey, message, cancellationToken); - } - - public async Task SendToPushAsync(object message, CancellationToken cancellationToken = default) - { - var queueKey = _configuration["MQ:Queue:Push"]; - await SendAsync(queueKey, message, cancellationToken); - } - - public async Task SendToReportAsync(T message, CancellationToken cancellationToken = default) where T : class - { - var queueKey = _configuration["MQ:Queue:Report"]; - //await SendAsync(queueName, message, cancellationToken); - var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}")); - await endpoint.Send(message,typeof(T),cancellationToken); - - } - - public async Task SendToReportAsync(object message, CancellationToken cancellationToken = default) - { - var queueKey = _configuration["MQ:Queue:Report"]; - await SendAsync(queueKey, message, cancellationToken); - } - - - public async Task SendAsync(string queueKey,object message, CancellationToken cancellationToken = default) - { - var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}")); - await endpoint.Send(message, cancellationToken); - } - } -} diff --git a/JiShe.CollectBus.RabbitMQ/Senders/NSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/NSender.cs new file mode 100644 index 0000000..3a57911 --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Senders/NSender.cs @@ -0,0 +1,53 @@ +using MassTransit; +using Microsoft.Extensions.Configuration; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using JiShe.CollectBus.Common.Extensions.DependencyInjections; + +namespace JiShe.CollectBus.RabbitMQ.Senders +{ + public class NSender:INSender,ISingletonDependency + { + private readonly ISendEndpointProvider _sendEndpointProvider; + private readonly string _issuedKey; + private readonly string _receivedKey; + + + public NSender(ISendEndpointProvider sendEndpointProvider, IConfiguration configuration) + { + _sendEndpointProvider = sendEndpointProvider; + _issuedKey = configuration["MQ:Queue:Issued"]!; + _receivedKey = configuration["MQ:Queue:Received"]!; + } + + public async Task SendToIssuedAsync(T message, CancellationToken cancellationToken = default) where T : class + { + await SendAsync(_issuedKey, message, cancellationToken); + } + + public async Task SendToIssuedAsync(object message, CancellationToken cancellationToken = default) + { + await SendAsync(_issuedKey, message, cancellationToken); + } + + public async Task SendToReceivedAsync(T message, CancellationToken cancellationToken = default) where T : class + { + await SendAsync(_receivedKey, message, cancellationToken); + } + + public async Task SendToReceivedAsync(object message, CancellationToken cancellationToken = default) + { + await SendAsync(_receivedKey, message, cancellationToken); + } + + + public async Task SendAsync(string queueKey, object message, CancellationToken cancellationToken = default) + { + var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}")); + await endpoint.Send(message, cancellationToken); + } + } +}