From 16bd93cc110c014e0d6f8a1f67a87cecf921df78 Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Fri, 25 Oct 2024 19:11:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JiShe.CollectBus.ClickHouse.csproj | 17 +-- .../DependencyInjections/IScopedDependency.cs | 6 + .../ISingletonDependency.cs | 6 + .../ITransientDependency.cs | 6 + .../JiShe.CollectBus.Common.csproj | 16 +-- .../DependencyInjectionExtensions.cs | 106 ++++++++++++++---- .../ServiceCollectionExtensions.cs | 8 -- .../JiShe.CollectBus.Console.csproj | 3 + JiShe.CollectBus.Console/Program.cs | 1 + JiShe.CollectBus.Console/appsettings.json | 11 ++ .../JiShe.CollectBus.Core.csproj | 39 ++++--- .../Plugins/TcpServiceReceivedPlugin.cs | 38 ++++--- ...iShe.CollectBus.EntityFrameworkCore.csproj | 36 +++--- .../Abstracts/BaseProtocolPlugin.cs | 28 ++--- .../Attributes/ProtocolNameAttribute.cs | 11 +- .../DependencyInjection/IScopedDependency.cs | 10 -- .../ISingletonDependency.cs | 10 -- .../ITransientDependency.cs | 10 -- .../Interfaces/IProtocolPlugin.cs | 11 +- ...JiShe.CollectBus.Protocol.Contracts.csproj | 4 +- .../JiShe.CollectBus.Protocol.Test.csproj | 23 ++-- .../TestProtocolPlugin.cs | 89 ++------------- .../JiShe.CollectBus.Protocol.csproj | 35 +++--- .../StandardProtocolPlugin.cs | 21 ++-- .../Consumers/PushConsumer.cs | 13 +++ .../Consumers/ReportConsumer.cs | 13 +++ .../ServiceCollectionExtensions.cs | 54 +++++++++ .../JiShe.CollectBus.RabbitMQ.csproj | 19 ++++ JiShe.CollectBus.RabbitMQ/Models/PushDto.cs | 15 +++ JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs | 36 ++++++ .../Senders/IMqSender.cs | 18 +++ JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs | 49 ++++++++ JiShe.CollectBus.sln | 9 +- 33 files changed, 503 insertions(+), 268 deletions(-) create mode 100644 JiShe.CollectBus.Common/Extensions/DependencyInjections/IScopedDependency.cs create mode 100644 JiShe.CollectBus.Common/Extensions/DependencyInjections/ISingletonDependency.cs create mode 100644 JiShe.CollectBus.Common/Extensions/DependencyInjections/ITransientDependency.cs delete mode 100644 JiShe.CollectBus.Protocol.Contracts/DependencyInjection/IScopedDependency.cs delete mode 100644 JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ISingletonDependency.cs delete mode 100644 JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ITransientDependency.cs create mode 100644 JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs create mode 100644 JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs create mode 100644 JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs create mode 100644 JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj create mode 100644 JiShe.CollectBus.RabbitMQ/Models/PushDto.cs create mode 100644 JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs create mode 100644 JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs create mode 100644 JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs diff --git a/JiShe.CollectBus.ClickHouse/JiShe.CollectBus.ClickHouse.csproj b/JiShe.CollectBus.ClickHouse/JiShe.CollectBus.ClickHouse.csproj index 77edadd..58d0070 100644 --- a/JiShe.CollectBus.ClickHouse/JiShe.CollectBus.ClickHouse.csproj +++ b/JiShe.CollectBus.ClickHouse/JiShe.CollectBus.ClickHouse.csproj @@ -1,13 +1,14 @@  - - net8.0 - enable - enable - + + net8.0 + enable + enable + preview + - - - + + + diff --git a/JiShe.CollectBus.Common/Extensions/DependencyInjections/IScopedDependency.cs b/JiShe.CollectBus.Common/Extensions/DependencyInjections/IScopedDependency.cs new file mode 100644 index 0000000..2b4fe1c --- /dev/null +++ b/JiShe.CollectBus.Common/Extensions/DependencyInjections/IScopedDependency.cs @@ -0,0 +1,6 @@ +namespace JiShe.CollectBus.Common.Extensions.DependencyInjections +{ + public interface IScopedDependency + { + } +} diff --git a/JiShe.CollectBus.Common/Extensions/DependencyInjections/ISingletonDependency.cs b/JiShe.CollectBus.Common/Extensions/DependencyInjections/ISingletonDependency.cs new file mode 100644 index 0000000..c3f89fa --- /dev/null +++ b/JiShe.CollectBus.Common/Extensions/DependencyInjections/ISingletonDependency.cs @@ -0,0 +1,6 @@ +namespace JiShe.CollectBus.Common.Extensions.DependencyInjections +{ + public interface ISingletonDependency + { + } +} diff --git a/JiShe.CollectBus.Common/Extensions/DependencyInjections/ITransientDependency.cs b/JiShe.CollectBus.Common/Extensions/DependencyInjections/ITransientDependency.cs new file mode 100644 index 0000000..4c2092c --- /dev/null +++ b/JiShe.CollectBus.Common/Extensions/DependencyInjections/ITransientDependency.cs @@ -0,0 +1,6 @@ +namespace JiShe.CollectBus.Common.Extensions.DependencyInjections +{ + public interface ITransientDependency + { + } +} diff --git a/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj b/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj index 626aa63..4dbdb38 100644 --- a/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj +++ b/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj @@ -1,12 +1,14 @@  - - netstandard2.1 - enable - + + netstandard2.1 + enable + preview + - - - + + + + diff --git a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs index 482bdc7..f2c5686 100644 --- a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs +++ b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs @@ -1,6 +1,7 @@ -using System.Reflection; +using JiShe.CollectBus.Common.Extensions.DependencyInjections; using JiShe.CollectBus.Protocol.Contracts.Attributes; -using JiShe.CollectBus.Protocol.Contracts.DependencyInjection; +using System.Diagnostics.CodeAnalysis; +using System.Reflection; using TouchSocket.Core; // ReSharper disable once CheckNamespace @@ -8,16 +9,26 @@ namespace Microsoft.Extensions.DependencyInjection { public static class DependencyInjectionExtensions { - public static void ServiceRegister(this IServiceCollection services, params Assembly[] assemblies) + public static void ServiceRegister(this IServiceCollection services) { - if (assemblies.Length <= 0) - { - assemblies = [Assembly.GetExecutingAssembly()]; - } + var assemblies = GetBinAssemblies(); foreach (var assembly in assemblies) { var allTypes = assembly.GetTypes(); + foreach (var type in allTypes) + { + if (typeof(ISingletonDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) + { + var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency")); + foreach (var interfaceType in interfaceTypes) + { + var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Singleton); + services.Add(serviceDescriptor); + } + } + } + foreach (var type in allTypes) { if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) @@ -25,14 +36,38 @@ namespace Microsoft.Extensions.DependencyInjection var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency")); foreach (var interfaceType in interfaceTypes) { - var attr = type.GetCustomAttribute(); - if (attr == null) continue; - var serviceDescriptor = new ServiceDescriptor(interfaceType, attr.Name, type, ServiceLifetime.Transient); + var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Transient); services.Add(serviceDescriptor); } } } + foreach (var type in allTypes) + { + if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) + { + var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency")); + foreach (var interfaceType in interfaceTypes) + { + var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Scoped); + services.Add(serviceDescriptor); + } + } + } + } + } + + public static void PluginServiceRegister(this IServiceCollection services, string pluginPath = "") + { + if (pluginPath.IsNullOrWhiteSpace()) + { + pluginPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins"); + } + var assemblies = GetAssembliesFromFolder(pluginPath); + + foreach (var assembly in assemblies) + { + var allTypes = assembly.GetTypes(); foreach (var type in allTypes) { if (typeof(ISingletonDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) @@ -48,6 +83,21 @@ namespace Microsoft.Extensions.DependencyInjection } } + foreach (var type in allTypes) + { + if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) + { + var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency")); + foreach (var interfaceType in interfaceTypes) + { + var attr = type.GetCustomAttribute(); + if (attr == null) continue; + var serviceDescriptor = new ServiceDescriptor(interfaceType, attr.Name, type, ServiceLifetime.Transient); + services.Add(serviceDescriptor); + } + } + } + foreach (var type in allTypes) { if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) @@ -65,20 +115,10 @@ namespace Microsoft.Extensions.DependencyInjection } } - public static void PluginServiceRegister(this IServiceCollection services, string pluginPath = "") - { - if (pluginPath.IsNullOrWhiteSpace()) - { - pluginPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Plugins"); - } - var assemblies = GetAssembliesFromFolder(pluginPath); - ServiceRegister(services, assemblies.ToArray()); - } - private static IEnumerable GetAssembliesFromFolder(string folderPath) { var directory = new DirectoryInfo(folderPath); - if (!directory.Exists) return Enumerable.Empty(); + if (!directory.Exists) return []; var files = directory.GetFiles("*.dll"); @@ -98,5 +138,29 @@ namespace Microsoft.Extensions.DependencyInjection return assemblies; } + + private static IEnumerable GetBinAssemblies() + { + var directory = new DirectoryInfo(AppDomain.CurrentDomain.BaseDirectory); + if (!directory.Exists) return []; + + var files = directory.GetFiles("JiShe.CollectBus.*.dll"); + + var assemblies = new List(); + foreach (var file in files) + { + try + { + var assembly = Assembly.LoadFrom(file.FullName); + assemblies.Add(assembly); + } + catch (Exception ex) + { + Console.WriteLine($"Error loading assembly from {file.FullName}: {ex.Message}"); + } + } + + return assemblies; + } } } diff --git a/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs b/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs index b39c6f3..7264af8 100644 --- a/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs +++ b/JiShe.CollectBus.Console/Extensions/ServiceCollections/ServiceCollectionExtensions.cs @@ -30,14 +30,6 @@ namespace Microsoft.Extensions.DependencyInjection }) .ConfigurePlugins(a => { - a.UseCheckClear() - .SetCheckClearType(CheckClearType.All) - .SetTick(TimeSpan.FromSeconds(60)) - .SetOnClose((c, t) => - { - c.TryShutdown(); - c.SafeClose("超时无数据"); - }); a.Add(); a.Add(); a.Add(); diff --git a/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj b/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj index aa9636a..237a69c 100644 --- a/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj +++ b/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj @@ -27,13 +27,16 @@ + + + diff --git a/JiShe.CollectBus.Console/Program.cs b/JiShe.CollectBus.Console/Program.cs index c26b9fa..61e58ff 100644 --- a/JiShe.CollectBus.Console/Program.cs +++ b/JiShe.CollectBus.Console/Program.cs @@ -33,6 +33,7 @@ namespace JiShe.CollectBus.Console services.PluginServiceRegister(); services.AddTcp(configuration); services.AddUdp(configuration); + services.AddMassTransit(configuration); services.AddStackExchangeRedisCache(options => { options.Configuration = configuration["RedisCache:ConnectionString"]; diff --git a/JiShe.CollectBus.Console/appsettings.json b/JiShe.CollectBus.Console/appsettings.json index 27b20f5..ea483c5 100644 --- a/JiShe.CollectBus.Console/appsettings.json +++ b/JiShe.CollectBus.Console/appsettings.json @@ -13,5 +13,16 @@ "RedisCache": { "ConnectionString": "123456@qwer@localhost:6379", "InstanceName": "CollectBus" + }, + "MQ": { + "Host": "118.190.144.92", + "Port": "5672", + "VirtualHost": "/", + "UserName": "collectbus", + "Password": "123456", + "Queue": { + "Push": "PUSH_COMMANDS", + "Report": "REPORT_COMMAND" + } } } \ No newline at end of file diff --git a/JiShe.CollectBus.Core/JiShe.CollectBus.Core.csproj b/JiShe.CollectBus.Core/JiShe.CollectBus.Core.csproj index 9286c2d..a893ed6 100644 --- a/JiShe.CollectBus.Core/JiShe.CollectBus.Core.csproj +++ b/JiShe.CollectBus.Core/JiShe.CollectBus.Core.csproj @@ -1,25 +1,28 @@  - - net8.0 - enable - enable - + + net8.0 + enable + enable + preview - - - - - + - - - - + + + + + - - - - + + + + + + + + + + diff --git a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs index 9e42266..6213069 100644 --- a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs +++ b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs @@ -1,28 +1,40 @@ -using Microsoft.Extensions.DependencyInjection; -using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.RabbitMQ.Models; +using JiShe.CollectBus.RabbitMQ.Senders; using TouchSocket.Core; using TouchSocket.Sockets; namespace JiShe.CollectBus.Core.Plugins { - public partial class TcpServiceReceivedPlugin(IServiceProvider serviceProvider) : PluginBase + public partial class TcpServiceReceivedPlugin(IMqSender sender) : PluginBase { [GeneratorPlugin(typeof(ITcpReceivedPlugin))] public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e) { - //TODO: 电表主站到集中器的协议都是376.1协议,集中器下发到电表协议分为645-07和modbus - //TODO: 水表主站到集中器的协议分为118和645-97协议 - //TODO: 连接成功时获取档案信息,根据档案信息匹配协议正则获取协议服务进行监听发送 + ////TODO: 电表主站到集中器的协议都是376.1协议,集中器下发到电表协议分为645-07和modbus + ////TODO: 水表主站到集中器的协议分为118和645-97协议 + ////TODO: 连接成功时获取档案信息,根据档案信息匹配协议正则获取协议服务进行监听发送 - const string protocolType = "StandardProtocol"; + //const string protocolType = "StandardProtocol"; - var protocolPlugin = serviceProvider.GetKeyedService(protocolType); - client.Logger.Info($"{protocolPlugin?.Get().Name},{protocolPlugin?.Get().RegularExpression}"); - //从客户端收到信息 - var messageHexString = Convert.ToHexString(e.ByteBlock.Span); - client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}"); + //var protocolPlugin = serviceProvider.GetKeyedService(protocolType); + //var protocolPluginInfo = await protocolPlugin.GetAsync(); + //client.Logger.Info($"{protocolPluginInfo.Name},{protocolPluginInfo.RegularExpression}"); + ////从客户端收到信息 + //var messageHexString = Convert.ToHexString(e.ByteBlock.Span); + //client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}"); + + //await protocolPlugin.ReceivedAsync(client,e); + + + await sender.SendToReportAsync(new ReportDto + { + ClientIP = client.IP, + ClientId = client.Id, + Port = client.Port, + MessageHexString = Convert.ToHexString(e.ByteBlock.Span), + DeviceNo = "111" + }); - protocolPlugin?.Received(client,e); await e.InvokeNext(); } diff --git a/JiShe.CollectBus.EntityFrameworkCore/JiShe.CollectBus.EntityFrameworkCore.csproj b/JiShe.CollectBus.EntityFrameworkCore/JiShe.CollectBus.EntityFrameworkCore.csproj index a5d38eb..6764619 100644 --- a/JiShe.CollectBus.EntityFrameworkCore/JiShe.CollectBus.EntityFrameworkCore.csproj +++ b/JiShe.CollectBus.EntityFrameworkCore/JiShe.CollectBus.EntityFrameworkCore.csproj @@ -1,23 +1,25 @@  - - net8.0 - enable - enable - + + net8.0 + enable + enable + preview - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - + - - - + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + diff --git a/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 33e823b..383d308 100644 --- a/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; using Microsoft.Extensions.Caching.Distributed; @@ -6,32 +7,33 @@ using TouchSocket.Sockets; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { - public abstract class BaseProtocolPlugin:IProtocolPlugin + public abstract class BaseProtocolPlugin(IDistributedCache cache) : IProtocolPlugin { - private readonly IDistributedCache _cache; + public abstract Task GetAsync(); - protected BaseProtocolPlugin(IDistributedCache cache) + public virtual Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e) { - _cache = cache; + return Task.CompletedTask; } - public abstract ProtocolInfo Get(); - - public abstract void Received(ITcpSessionClient client, ReceivedDataEventArgs e); - - public abstract void Send(); - - public void Load() + public virtual Task SendAsync() { - var result = _cache.GetString("myKey"); + return Task.CompletedTask; + } + + public virtual async Task LoadAsync() + { + var result = cache.GetString("myKey"); if (result == null) { result = "Calculated Data"; - _cache.SetString("myKey", result, new DistributedCacheEntryOptions + cache.SetString("myKey", result, new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(10) }); } + + await Task.CompletedTask; } } diff --git a/JiShe.CollectBus.Protocol.Contracts/Attributes/ProtocolNameAttribute.cs b/JiShe.CollectBus.Protocol.Contracts/Attributes/ProtocolNameAttribute.cs index 93aa9bf..a04af71 100644 --- a/JiShe.CollectBus.Protocol.Contracts/Attributes/ProtocolNameAttribute.cs +++ b/JiShe.CollectBus.Protocol.Contracts/Attributes/ProtocolNameAttribute.cs @@ -1,17 +1,10 @@ using System; -using System.Collections.Generic; -using System.Text; namespace JiShe.CollectBus.Protocol.Contracts.Attributes { [AttributeUsage(AttributeTargets.Class)] - public class ProtocolNameAttribute: Attribute + public class ProtocolNameAttribute(string name) : Attribute { - public ProtocolNameAttribute(string name) - { - Name = name; - } - - public string Name { get; set; } + public string Name { get; set; } = name; } } diff --git a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/IScopedDependency.cs b/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/IScopedDependency.cs deleted file mode 100644 index 8d5bd50..0000000 --- a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/IScopedDependency.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace JiShe.CollectBus.Protocol.Contracts.DependencyInjection -{ - public interface IScopedDependency - { - } -} diff --git a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ISingletonDependency.cs b/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ISingletonDependency.cs deleted file mode 100644 index 411e68d..0000000 --- a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ISingletonDependency.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace JiShe.CollectBus.Protocol.Contracts.DependencyInjection -{ - public interface ISingletonDependency - { - } -} diff --git a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ITransientDependency.cs b/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ITransientDependency.cs deleted file mode 100644 index e43fe49..0000000 --- a/JiShe.CollectBus.Protocol.Contracts/DependencyInjection/ITransientDependency.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace JiShe.CollectBus.Protocol.Contracts.DependencyInjection -{ - public interface ITransientDependency - { - } -} diff --git a/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs index 54bb050..1c61344 100644 --- a/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs @@ -1,16 +1,17 @@ -using JiShe.CollectBus.Protocol.Contracts.Models; +using System.Threading.Tasks; +using JiShe.CollectBus.Protocol.Contracts.Models; using TouchSocket.Sockets; namespace JiShe.CollectBus.Protocol.Contracts.Interfaces { public interface IProtocolPlugin { - ProtocolInfo Get(); + Task GetAsync(); - void Load(); + Task LoadAsync(); - void Received(ITcpSessionClient client, ReceivedDataEventArgs e); + Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e); - void Send(); + Task SendAsync(); } } diff --git a/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj index 79afffb..2dba74a 100644 --- a/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj +++ b/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj @@ -3,15 +3,17 @@ netstandard2.1 enable + preview + - + diff --git a/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj b/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj index f15ccdf..8160934 100644 --- a/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj +++ b/JiShe.CollectBus.Protocol.Test/JiShe.CollectBus.Protocol.Test.csproj @@ -1,17 +1,18 @@  - - net8.0 - enable - enable - + + net8.0 + enable + enable + preview + - - - + + + - - - + + + diff --git a/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs b/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs index f3c0870..a6f3b54 100644 --- a/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol.Test/TestProtocolPlugin.cs @@ -1,6 +1,6 @@ -using JiShe.CollectBus.Protocol.Contracts.Abstracts; +using JiShe.CollectBus.Common.Extensions.DependencyInjections; +using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocol.Contracts.Attributes; -using JiShe.CollectBus.Protocol.Contracts.DependencyInjection; using JiShe.CollectBus.Protocol.Contracts.Models; using Microsoft.Extensions.Caching.Distributed; using TouchSocket.Sockets; @@ -10,92 +10,19 @@ namespace JiShe.CollectBus.Protocol.Test [ProtocolName("TestProtocol")] public class TestProtocolPlugin(IDistributedCache cache) : BaseProtocolPlugin(cache), ISingletonDependency { - public override ProtocolInfo Get() - { - return new ProtocolInfo("Test", "376.1", "TCP", "376.1协议", "DTSU1980"); - } - - public override void Received(ITcpSessionClient client, ReceivedDataEventArgs e) + public override Task GetAsync() { throw new NotImplementedException(); } - public override void Send() + public override Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e) { throw new NotImplementedException(); } - //档案下发 - //var listMeter = new List() { new MeterParameter(){ - // Pn = 1, - // BaudRate = 3, - // Port = 2, - // ProtocolType = ProtocolType.DLT6452007, - // Address = "312408006642", - // Password = "000000", - // RateNumber = 4, - // IntegerBitNumber = 4, - // DecimalBitNumber = 4, - // CollectorAddress = "000000000000", - // UserCategoryNumber = 0, - // UserSubclassNumber = 0 - //} }; - //new BuildCommand().GetAFN04F10DataUnit(new ReqParameter2() { - // AFN = AFN.设置参数, - // CMasterStationFunCode = CMasterStationFunCode.请求1级数据, - // A= "322009872", - // Seq = new Seq() - // { - // TpV = TpV.附加信息域中无时间标签, - // FIRFIN = FIRFIN.单帧, - // CON = CON.需要对该帧进行确认, - // PRSEQ = 10, - // }, - // MSA = 13, - // Pn = 0, - // Fn = 10 - //},listMeter); - //档案读取 - //new BuildCommand().GetAFN10F10DataUnit(new ReqParameter2() - //{ - // AFN = AFN.查询参数, - // CMasterStationFunCode = CMasterStationFunCode.请求2级数据, - // A = "322009872", - // Seq = new Seq() - // { - // TpV = TpV.附加信息域中无时间标签, - // FIRFIN = FIRFIN.单帧, - // CON = CON.不需要对该帧进行确认, - // PRSEQ = 2, - // }, - // MSA = 13, - // Pn = 0, - // Fn = 10 - //},new List() {1,2 }); - - //var str = "68A600A6006888203290261A0A6200000201010001000100621E426622082431000000000000040300000000000000CA16"; - //var cmdResult = new BuildCommand().AnalysisCmd(str); - //if(cmdResult != null) - //{ - // var list = new BuildCommand().AnalysisAFN04F10DataUnit(cmdResult.HexDatas); - //} - //new BuildCommand().GetCommandBytes(new ReqParameter2() - //{ - // AFN = AFN.请求实时数据, - // CMasterStationFunCode = CMasterStationFunCode.请求2级数据, - // A = "322009872", - // Seq = new Seq() - // { - // TpV = TpV.附加信息域中无时间标签, - // FIRFIN = FIRFIN.单帧, - // CON = CON.不需要对该帧进行确认, - // PRSEQ = 2, - // }, - // MSA = 13, - // Pn = 1, - // Fn = 129 - // }); - - //new BuildCommand().AmmeterValveControl("312408006642", "", "000000", true); + public override Task SendAsync() + { + throw new NotImplementedException(); + } } } diff --git a/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj b/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj index 708f4a0..b54c79a 100644 --- a/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj +++ b/JiShe.CollectBus.Protocol/JiShe.CollectBus.Protocol.csproj @@ -1,23 +1,26 @@  - - net8.0 - enable - enable - - + + net8.0 + enable + enable + + preview - - - + - - - - + + + - - - + + + + + + + + + diff --git a/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs b/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs index 909c710..1aae7c0 100644 --- a/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs +++ b/JiShe.CollectBus.Protocol/StandardProtocolPlugin.cs @@ -1,7 +1,7 @@ using JiShe.CollectBus.Common; +using JiShe.CollectBus.Common.Extensions.DependencyInjections; using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocol.Contracts.Attributes; -using JiShe.CollectBus.Protocol.Contracts.DependencyInjection; using JiShe.CollectBus.Protocol.Contracts.Models; using Microsoft.Extensions.Caching.Distributed; using TouchSocket.Sockets; @@ -37,17 +37,18 @@ namespace JiShe.CollectBus.Protocol } - public override ProtocolInfo Get() + public override async Task GetAsync() { - return new ProtocolInfo("Standard", "376.1", "TCP", "376.1协议", "DTS1980"); + var info = new ProtocolInfo("Standard", "376.1", "TCP", "376.1协议", "DTS1980"); + return await Task.FromResult(info); } - public new void Load() + public new async Task LoadAsync() { - base.Load(); + await base.LoadAsync(); } - public override void Received(ITcpSessionClient client, ReceivedDataEventArgs e) + public override async Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e) { tcpSessionClient = client; var messageHexString = Convert.ToHexString(e.ByteBlock.Span); @@ -57,15 +58,16 @@ namespace JiShe.CollectBus.Protocol return; } AnalysisData(cmdResult); + await Task.CompletedTask; } - public override void Send() + public override async Task SendAsync() { - throw new NotImplementedException(); + await Task.CompletedTask; } /// - /// Gets the msa. + /// Gets the msa. /// /// The mark. /// @@ -293,6 +295,7 @@ namespace JiShe.CollectBus.Protocol }; commandReulst.ReplyBytes = GetCommandBytes(reqParam); tcpSessionClient.SendAsync(tcpSessionClient.Id,commandReulst.ReplyBytes); + } else if (commandReulst.Fn == 2)//退出登录 diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs new file mode 100644 index 0000000..481b54b --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Consumers/PushConsumer.cs @@ -0,0 +1,13 @@ +using MassTransit; +using JiShe.CollectBus.RabbitMQ.Models; + +namespace JiShe.CollectBus.RabbitMQ.Consumers +{ + public class PushConsumer : IConsumer + { + public async Task Consume(ConsumeContext context) + { + Console.WriteLine(context.Message.ClientId); + } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs new file mode 100644 index 0000000..e39a18e --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs @@ -0,0 +1,13 @@ +using JiShe.CollectBus.RabbitMQ.Models; +using MassTransit; + +namespace JiShe.CollectBus.RabbitMQ.Consumers +{ + internal class ReportConsumer : IConsumer + { + public async Task Consume(ConsumeContext context) + { + Console.WriteLine(context.Message.ClientId); + } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs b/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..dee01ae --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs @@ -0,0 +1,54 @@ +using JiShe.CollectBus.RabbitMQ.Consumers; +using MassTransit; +using Microsoft.Extensions.Configuration; +using RabbitMQ.Client; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class ServiceCollectionExtensions + { + public static IServiceCollection AddMassTransit(this IServiceCollection services, IConfiguration configuration) + { + services.AddMassTransit(x => + { + x.UsingRabbitMq((context, cfg) => + { + cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h => + { + h.Username(configuration["MQ:UserName"] ?? string.Empty); + h.Password(configuration["MQ:Password"] ?? string.Empty); + }); + + cfg.ReceiveEndpoint(configuration["MQ:Queue:Push"] ?? string.Empty, x => + { + x.ConfigureConsumeTopology = false; + + x.Consumer(); + + x.Bind("commands", s => + { + s.RoutingKey = configuration["MQ:Queue:Push"]; + s.ExchangeType = ExchangeType.Direct; + }); + }); + + cfg.ReceiveEndpoint(configuration["MQ:Queue:Report"] ?? string.Empty, x => + { + x.ConfigureConsumeTopology = false; + + x.Consumer(); + + x.Bind("commands", s => + { + s.RoutingKey = configuration["MQ:Queue:Report"]; + s.ExchangeType = ExchangeType.Direct; + }); + }); + }); + }); + services.AddMassTransitHostedService(); + return services; + } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj new file mode 100644 index 0000000..9aa5847 --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj @@ -0,0 +1,19 @@ + + + + net8.0 + enable + enable + preview + + + + + + + + + + + + diff --git a/JiShe.CollectBus.RabbitMQ/Models/PushDto.cs b/JiShe.CollectBus.RabbitMQ/Models/PushDto.cs new file mode 100644 index 0000000..d203020 --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Models/PushDto.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.RabbitMQ.Models +{ + public class PushDto + { + public int ClientId { get; set; } + public string Message { get; set; } + public string DeviceNo { get; set; } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs b/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs new file mode 100644 index 0000000..1ea7c5f --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.RabbitMQ.Models +{ + public class ReportDto + { + /// + /// 客服端标识 + /// + public string ClientId { get; set; } + + /// + /// 客服端IP + /// + public string ClientIP { get; set; } + + /// + /// 客服端端口 + /// + public int Port { get; set; } + + /// + /// 客服端报文 + /// + public string MessageHexString { get; set; } + + /// + /// 设备号 + /// + public string DeviceNo { get; set; } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs new file mode 100644 index 0000000..6cedf7d --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using JiShe.CollectBus.Common.Extensions.DependencyInjections; + +namespace JiShe.CollectBus.RabbitMQ.Senders +{ + public interface IMqSender : ISingletonDependency + { + Task SendToPushAsync(T message, CancellationToken cancellationToken = default) where T : class; + Task SendToPushAsync(object message, CancellationToken cancellationToken = default); + Task SendToReportAsync(T message, CancellationToken cancellationToken = default) where T : class; + Task SendToReportAsync(object message, CancellationToken cancellationToken = default); + Task SendAsync(string queueKey, object message, CancellationToken cancellationToken = default); + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs new file mode 100644 index 0000000..7b21d0a --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs @@ -0,0 +1,49 @@ +using JiShe.CollectBus.Common.Extensions.DependencyInjections; +using MassTransit; +using Microsoft.Extensions.Configuration; + +namespace JiShe.CollectBus.RabbitMQ.Senders +{ + public class MqSender : IMqSender + { + private readonly ISendEndpointProvider _sendEndpointProvider; + private readonly IConfiguration _configuration; + + public MqSender(ISendEndpointProvider sendEndpointProvider, IConfiguration configuration) + { + _sendEndpointProvider = sendEndpointProvider; + _configuration = configuration; + } + + public async Task SendToPushAsync(T message, CancellationToken cancellationToken = default) where T : class + { + var queueName = _configuration["MQ:Queue:Push"]; + await SendAsync(queueName, message, cancellationToken); + } + + public async Task SendToPushAsync(object message, CancellationToken cancellationToken = default) + { + var queueName = _configuration["MQ:Queue:Push"]; + await SendAsync(queueName, message, cancellationToken); + } + + public async Task SendToReportAsync(T message, CancellationToken cancellationToken = default) where T : class + { + var queueName = _configuration["MQ:Queue:Report"]; + await SendAsync(queueName, message, cancellationToken); + } + + public async Task SendToReportAsync(object message, CancellationToken cancellationToken = default) + { + var queueName = _configuration["MQ:Queue:Report"]; + await SendAsync(queueName, message, cancellationToken); + } + + + public async Task SendAsync(string queueKey,object message, CancellationToken cancellationToken = default) + { + var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}")); + await endpoint.Send(message, cancellationToken); + } + } +} diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index ece59cf..0832b98 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -21,7 +21,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.Protocol.T EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.EntityFrameworkCore", "JiShe.CollectBus.EntityFrameworkCore\JiShe.CollectBus.EntityFrameworkCore.csproj", "{16D42BCF-EDB8-4153-B37D-0B10FB6DF36C}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.ClickHouse", "JiShe.CollectBus.ClickHouse\JiShe.CollectBus.ClickHouse.csproj", "{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.ClickHouse", "JiShe.CollectBus.ClickHouse\JiShe.CollectBus.ClickHouse.csproj", "{65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.RabbitMQ", "JiShe.CollectBus.RabbitMQ\JiShe.CollectBus.RabbitMQ.csproj", "{DB46D90E-304D-48B7-9ED6-F4DCC95D3824}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -61,6 +63,10 @@ Global {65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}.Debug|Any CPU.Build.0 = Debug|Any CPU {65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}.Release|Any CPU.ActiveCfg = Release|Any CPU {65A2837C-A5EE-475B-8079-EE5A1BCD2E8F}.Release|Any CPU.Build.0 = Release|Any CPU + {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DB46D90E-304D-48B7-9ED6-F4DCC95D3824}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -74,6 +80,7 @@ Global {289196B4-FFBE-4E40-A3A1-FCFADBE945ED} = {3A04FB29-EA75-4499-BBF3-AF24C7D46A1D} {16D42BCF-EDB8-4153-B37D-0B10FB6DF36C} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5} {65A2837C-A5EE-475B-8079-EE5A1BCD2E8F} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5} + {DB46D90E-304D-48B7-9ED6-F4DCC95D3824} = {C7DEC9FB-3F75-4584-85B0-16EA3CB222E5} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {33261859-9CD1-4A43-B181-AB75C247D1CD}