diff --git a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs index f2c5686..6c2eacb 100644 --- a/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs +++ b/JiShe.CollectBus.Console/Extensions/DependencyInjections/DependencyInjectionExtensions.cs @@ -23,8 +23,7 @@ namespace Microsoft.Extensions.DependencyInjection var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency")); foreach (var interfaceType in interfaceTypes) { - var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Singleton); - services.Add(serviceDescriptor); + services.AddSingleton(interfaceType, type); } } } @@ -36,8 +35,7 @@ namespace Microsoft.Extensions.DependencyInjection var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency")); foreach (var interfaceType in interfaceTypes) { - var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Transient); - services.Add(serviceDescriptor); + services.AddTransient(interfaceType, type); } } } @@ -49,8 +47,7 @@ namespace Microsoft.Extensions.DependencyInjection var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency")); foreach (var interfaceType in interfaceTypes) { - var serviceDescriptor = new ServiceDescriptor(interfaceType, interfaceType.Name, type, ServiceLifetime.Scoped); - services.Add(serviceDescriptor); + services.AddScoped(interfaceType, type); } } } diff --git a/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj b/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj index 237a69c..3fd1a06 100644 --- a/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj +++ b/JiShe.CollectBus.Console/JiShe.CollectBus.Console.csproj @@ -7,6 +7,8 @@ enable true true + true + diff --git a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs index 6213069..3bae441 100644 --- a/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs +++ b/JiShe.CollectBus.Core/Plugins/TcpServiceReceivedPlugin.cs @@ -1,12 +1,24 @@ -using JiShe.CollectBus.RabbitMQ.Models; +using JiShe.CollectBus.Protocol.Contracts.Interfaces; +using JiShe.CollectBus.RabbitMQ.Models; using JiShe.CollectBus.RabbitMQ.Senders; +using Microsoft.Extensions.DependencyInjection; +using System; using TouchSocket.Core; using TouchSocket.Sockets; namespace JiShe.CollectBus.Core.Plugins { - public partial class TcpServiceReceivedPlugin(IMqSender sender) : PluginBase + public partial class TcpServiceReceivedPlugin : PluginBase { + private readonly IServiceProvider _serviceProvider; + private readonly IMqSender _mqSender; + + public TcpServiceReceivedPlugin(IServiceProvider serviceProvider, IMqSender mqSender) + { + _serviceProvider = serviceProvider; + _mqSender = mqSender; + } + [GeneratorPlugin(typeof(ITcpReceivedPlugin))] public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e) { @@ -16,7 +28,7 @@ namespace JiShe.CollectBus.Core.Plugins //const string protocolType = "StandardProtocol"; - //var protocolPlugin = serviceProvider.GetKeyedService(protocolType); + //var protocolPlugin = _serviceProvider.GetKeyedService(protocolType); //var protocolPluginInfo = await protocolPlugin.GetAsync(); //client.Logger.Info($"{protocolPluginInfo.Name},{protocolPluginInfo.RegularExpression}"); ////从客户端收到信息 @@ -25,8 +37,7 @@ namespace JiShe.CollectBus.Core.Plugins //await protocolPlugin.ReceivedAsync(client,e); - - await sender.SendToReportAsync(new ReportDto + await _mqSender.SendToReportAsync(new ReportDto { ClientIP = client.IP, ClientId = client.Id, diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs index e39a18e..471d8d5 100644 --- a/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs +++ b/JiShe.CollectBus.RabbitMQ/Consumers/ReportConsumer.cs @@ -3,7 +3,7 @@ using MassTransit; namespace JiShe.CollectBus.RabbitMQ.Consumers { - internal class ReportConsumer : IConsumer + public class ReportConsumer : IConsumer { public async Task Consume(ConsumeContext context) { diff --git a/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs b/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs index dee01ae..7234f8c 100644 --- a/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs +++ b/JiShe.CollectBus.RabbitMQ/Extensions/ServiceCollections/ServiceCollectionExtensions.cs @@ -45,6 +45,7 @@ namespace Microsoft.Extensions.DependencyInjection s.ExchangeType = ExchangeType.Direct; }); }); + //cfg.UseRawJsonSerializer(); }); }); services.AddMassTransitHostedService(); diff --git a/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj index 9aa5847..f676e4e 100644 --- a/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj +++ b/JiShe.CollectBus.RabbitMQ/JiShe.CollectBus.RabbitMQ.csproj @@ -4,7 +4,8 @@ net8.0 enable enable - preview + preview + true diff --git a/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs b/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs index 1ea7c5f..3643354 100644 --- a/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs +++ b/JiShe.CollectBus.RabbitMQ/Models/ReportDto.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using System.Text.Json.Serialization; using System.Threading.Tasks; namespace JiShe.CollectBus.RabbitMQ.Models diff --git a/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs index 6cedf7d..3e38e0c 100644 --- a/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs +++ b/JiShe.CollectBus.RabbitMQ/Senders/IMqSender.cs @@ -7,7 +7,7 @@ using JiShe.CollectBus.Common.Extensions.DependencyInjections; namespace JiShe.CollectBus.RabbitMQ.Senders { - public interface IMqSender : ISingletonDependency + public interface IMqSender { Task SendToPushAsync(T message, CancellationToken cancellationToken = default) where T : class; Task SendToPushAsync(object message, CancellationToken cancellationToken = default); diff --git a/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs b/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs index 7b21d0a..b4249c6 100644 --- a/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs +++ b/JiShe.CollectBus.RabbitMQ/Senders/MqSender.cs @@ -4,7 +4,7 @@ using Microsoft.Extensions.Configuration; namespace JiShe.CollectBus.RabbitMQ.Senders { - public class MqSender : IMqSender + public class MqSender : IMqSender, ISingletonDependency { private readonly ISendEndpointProvider _sendEndpointProvider; private readonly IConfiguration _configuration; @@ -17,26 +17,29 @@ namespace JiShe.CollectBus.RabbitMQ.Senders public async Task SendToPushAsync(T message, CancellationToken cancellationToken = default) where T : class { - var queueName = _configuration["MQ:Queue:Push"]; - await SendAsync(queueName, message, cancellationToken); + var queueKey = _configuration["MQ:Queue:Push"]; + await SendAsync(queueKey, message, cancellationToken); } public async Task SendToPushAsync(object message, CancellationToken cancellationToken = default) { - var queueName = _configuration["MQ:Queue:Push"]; - await SendAsync(queueName, message, cancellationToken); + var queueKey = _configuration["MQ:Queue:Push"]; + await SendAsync(queueKey, message, cancellationToken); } public async Task SendToReportAsync(T message, CancellationToken cancellationToken = default) where T : class { - var queueName = _configuration["MQ:Queue:Report"]; - await SendAsync(queueName, message, cancellationToken); + var queueKey = _configuration["MQ:Queue:Report"]; + //await SendAsync(queueName, message, cancellationToken); + var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}")); + await endpoint.Send(message,typeof(T),cancellationToken); + } public async Task SendToReportAsync(object message, CancellationToken cancellationToken = default) { - var queueName = _configuration["MQ:Queue:Report"]; - await SendAsync(queueName, message, cancellationToken); + var queueKey = _configuration["MQ:Queue:Report"]; + await SendAsync(queueKey, message, cancellationToken); }