修改代码

This commit is contained in:
cli 2024-10-28 16:23:39 +08:00
parent 0b1532d4af
commit 07211eed8b
27 changed files with 273 additions and 277 deletions

View File

@ -0,0 +1,11 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace JiShe.CollectBus.Common.Interfaces
{
public interface IJiSheModule
{
void ConfigureServices(IServiceCollection services,HostBuilderContext hostContext);
}
}

View File

@ -8,7 +8,9 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2024.2.0" /> <PackageReference Include="JetBrains.Annotations" Version="2024.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.2" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,14 +1,35 @@
using JiShe.CollectBus.Common.Extensions.DependencyInjections; using JiShe.CollectBus.Common.Extensions.DependencyInjections;
using JiShe.CollectBus.Protocol.Contracts.Attributes; using JiShe.CollectBus.Protocol.Contracts.Attributes;
using System.Diagnostics.CodeAnalysis;
using System.Reflection; using System.Reflection;
using JiShe.CollectBus.Common.Interfaces;
using TouchSocket.Core; using TouchSocket.Core;
using Microsoft.Extensions.Hosting;
using System;
// ReSharper disable once CheckNamespace // ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection namespace Microsoft.Extensions.DependencyInjection
{ {
public static class DependencyInjectionExtensions public static class DependencyInjectionExtensions
{ {
public static void ModuleRegister(this IServiceCollection services, HostBuilderContext hostContext)
{
var assemblies = GetBinAssemblies();
foreach (var assembly in assemblies)
{
var allTypes = assembly.GetTypes();
foreach (var type in allTypes)
{
if (typeof(IJiSheModule).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
{
var instance = Activator.CreateInstance(type);
_ = (type.GetMethod("ConfigureServices")?.Invoke(instance, [services, hostContext]));
}
}
}
}
public static void ServiceRegister(this IServiceCollection services) public static void ServiceRegister(this IServiceCollection services)
{ {
var assemblies = GetBinAssemblies(); var assemblies = GetBinAssemblies();
@ -18,7 +39,8 @@ namespace Microsoft.Extensions.DependencyInjection
var allTypes = assembly.GetTypes(); var allTypes = assembly.GetTypes();
foreach (var type in allTypes) foreach (var type in allTypes)
{ {
if (typeof(ISingletonDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) if (type is not { IsClass: true, IsAbstract: false }) continue;
if (typeof(ISingletonDependency).IsAssignableFrom(type))
{ {
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency")); var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ISingletonDependency"));
foreach (var interfaceType in interfaceTypes) foreach (var interfaceType in interfaceTypes)
@ -26,11 +48,8 @@ namespace Microsoft.Extensions.DependencyInjection
services.AddSingleton(interfaceType, type); services.AddSingleton(interfaceType, type);
} }
} }
}
foreach (var type in allTypes) if (typeof(ITransientDependency).IsAssignableFrom(type))
{
if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
{ {
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency")); var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency"));
foreach (var interfaceType in interfaceTypes) foreach (var interfaceType in interfaceTypes)
@ -38,11 +57,8 @@ namespace Microsoft.Extensions.DependencyInjection
services.AddTransient(interfaceType, type); services.AddTransient(interfaceType, type);
} }
} }
}
foreach (var type in allTypes) if (typeof(IScopedDependency).IsAssignableFrom(type))
{
if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
{ {
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency")); var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency"));
foreach (var interfaceType in interfaceTypes) foreach (var interfaceType in interfaceTypes)
@ -78,10 +94,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.Add(serviceDescriptor); services.Add(serviceDescriptor);
} }
} }
}
foreach (var type in allTypes)
{
if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) if (typeof(ITransientDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
{ {
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency")); var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("ITransientDependency"));
@ -93,10 +106,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.Add(serviceDescriptor); services.Add(serviceDescriptor);
} }
} }
}
foreach (var type in allTypes)
{
if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false }) if (typeof(IScopedDependency).IsAssignableFrom(type) && type is { IsClass: true, IsAbstract: false })
{ {
var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency")); var interfaceTypes = type.GetInterfaces().Where(p => p.FullName != null && !p.FullName.Contains("IScopedDependency"));

View File

@ -19,13 +19,6 @@ namespace Microsoft.Extensions.DependencyInjection
config.SetListenIPHosts(int.Parse(configuration["TCP:Port"] ?? "10500")) config.SetListenIPHosts(int.Parse(configuration["TCP:Port"] ?? "10500"))
.ConfigureContainer(a => //容器的配置顺序应该在最前面 .ConfigureContainer(a => //容器的配置顺序应该在最前面
{ {
//a.AddFileLogger(fileLogger =>
//{
// fileLogger.MaxSize = 1024 * 1024;
// fileLogger.LogLevel = LogLevel.Debug;
// fileLogger.CreateLogFolder = level => $"logs\\{DateTime.Now:yyyy-MM-dd}\\{level}";
//});
a.AddConsoleLogger(); a.AddConsoleLogger();
}) })
.ConfigurePlugins(a => .ConfigurePlugins(a =>

View File

@ -29,11 +29,11 @@ namespace JiShe.CollectBus.Console
private static void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext) private static void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
{ {
var configuration = hostContext.Configuration; var configuration = hostContext.Configuration;
services.ModuleRegister(hostContext);
services.ServiceRegister(); services.ServiceRegister();
services.PluginServiceRegister(); services.PluginServiceRegister();
services.AddTcp(configuration); services.AddTcp(configuration);
services.AddUdp(configuration); //services.AddUdp(configuration);
services.AddMassTransit(configuration);
services.AddStackExchangeRedisCache(options => services.AddStackExchangeRedisCache(options =>
{ {
options.Configuration = configuration["RedisCache:ConnectionString"]; options.Configuration = configuration["RedisCache:ConnectionString"];

View File

@ -21,8 +21,8 @@
"UserName": "collectbus", "UserName": "collectbus",
"Password": "123456", "Password": "123456",
"Queue": { "Queue": {
"Push": "PUSH_COMMANDS", "Received": "Received_Command",
"Report": "REPORT_COMMAND" "Issued": "Issued_Command"
} }
} }
} }

View File

@ -0,0 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>

View File

@ -0,0 +1,2 @@
// See https://aka.ms/new-console-template for more information
Console.WriteLine("Hello, World!");

View File

@ -1,24 +1,20 @@
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.RabbitMQ.Models;
using JiShe.CollectBus.RabbitMQ.Senders;
using Microsoft.Extensions.DependencyInjection;
using System;
using TouchSocket.Core; using TouchSocket.Core;
using TouchSocket.Sockets; using TouchSocket.Sockets;
using JiShe.CollectBus.RabbitMQ.Senders;
namespace JiShe.CollectBus.Core.Plugins namespace JiShe.CollectBus.Core.Plugins
{ {
public partial class TcpServiceReceivedPlugin : PluginBase public partial class TcpServiceReceivedPlugin : PluginBase
{ {
private readonly IServiceProvider _serviceProvider; private readonly INSender _nSender;
private readonly IMqSender _mqSender;
public TcpServiceReceivedPlugin(IServiceProvider serviceProvider, IMqSender mqSender) public TcpServiceReceivedPlugin(INSender nSender)
{ {
_serviceProvider = serviceProvider; _nSender = nSender;
_mqSender = mqSender;
} }
[GeneratorPlugin(typeof(ITcpReceivedPlugin))] [GeneratorPlugin(typeof(ITcpReceivedPlugin))]
public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e) public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e)
{ {
@ -36,16 +32,13 @@ namespace JiShe.CollectBus.Core.Plugins
//client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}"); //client.Logger.Info($"[TCP] 已从{client.GetIPPort()}接收到信息:{messageHexString}");
//await protocolPlugin.ReceivedAsync(client,e); //await protocolPlugin.ReceivedAsync(client,e);
await _nSender.SendToReceivedAsync(new MessageReceivedEvent
await _mqSender.SendToReportAsync(new ReportDto
{ {
ClientIP = client.IP, ClientIP = client.IP,
ClientId = client.Id, ClientId = client.Id,
Port = client.Port,
MessageHexString = Convert.ToHexString(e.ByteBlock.Span), MessageHexString = Convert.ToHexString(e.ByteBlock.Span),
DeviceNo = "111" Port = client.Port
}); });
await e.InvokeNext(); await e.InvokeNext();
} }

View File

@ -1,40 +1,13 @@
using System; using System.Threading.Tasks;
using System.Threading.Tasks;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models; using JiShe.CollectBus.Protocol.Contracts.Models;
using Microsoft.Extensions.Caching.Distributed;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{ {
public abstract class BaseProtocolPlugin(IDistributedCache cache) : IProtocolPlugin public abstract class BaseProtocolPlugin : IProtocolPlugin
{ {
public abstract Task<ProtocolInfo> GetAsync(); public abstract Task<ProtocolInfo> GetAsync();
public virtual Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e) public abstract Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent);
{
return Task.CompletedTask;
}
public virtual Task SendAsync()
{
return Task.CompletedTask;
}
public virtual async Task LoadAsync()
{
var result = cache.GetString("myKey");
if (result == null)
{
result = "Calculated Data";
cache.SetString("myKey", result, new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(10)
});
}
await Task.CompletedTask;
}
} }
} }

View File

@ -8,10 +8,6 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces
{ {
Task<ProtocolInfo> GetAsync(); Task<ProtocolInfo> GetAsync();
Task LoadAsync(); Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent);
Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e);
Task SendAsync();
} }
} }

View File

@ -0,0 +1,9 @@
namespace JiShe.CollectBus.Protocol.Contracts.Models
{
public class MessageIssuedEvent
{
public string ClientId { get; set; }
public byte[] Message { get; set; }
public string DeviceNo { get; set; }
}
}

View File

@ -1,13 +1,6 @@
using System; namespace JiShe.CollectBus.Protocol.Contracts.Models
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
namespace JiShe.CollectBus.RabbitMQ.Models
{ {
public class ReportDto public class MessageReceivedEvent
{ {
/// <summary> /// <summary>
/// 客服端标识 /// 客服端标识

View File

@ -8,19 +8,14 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.Protocol.Test namespace JiShe.CollectBus.Protocol.Test
{ {
[ProtocolName("TestProtocol")] [ProtocolName("TestProtocol")]
public class TestProtocolPlugin(IDistributedCache cache) : BaseProtocolPlugin(cache), ISingletonDependency public class TestProtocolPlugin : BaseProtocolPlugin, ISingletonDependency
{ {
public override Task<ProtocolInfo> GetAsync() public override Task<ProtocolInfo> GetAsync()
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
public override Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e) public override Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent)
{
throw new NotImplementedException();
}
public override Task SendAsync()
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }

View File

@ -3,13 +3,14 @@ using JiShe.CollectBus.Common.Extensions.DependencyInjections;
using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Attributes; using JiShe.CollectBus.Protocol.Contracts.Attributes;
using JiShe.CollectBus.Protocol.Contracts.Models; using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.RabbitMQ.Senders;
using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.Caching.Distributed;
using TouchSocket.Sockets; using TouchSocket.Sockets;
namespace JiShe.CollectBus.Protocol namespace JiShe.CollectBus.Protocol
{ {
[ProtocolName("StandardProtocol")] [ProtocolName("StandardProtocol")]
public class StandardProtocolPlugin(IDistributedCache cache) : BaseProtocolPlugin(cache), ISingletonDependency public class StandardProtocolPlugin(INSender nSender) : BaseProtocolPlugin, ISingletonDependency
{ {
//起始字符 //起始字符
private const string stx = "68"; private const string stx = "68";
@ -26,7 +27,7 @@ namespace JiShe.CollectBus.Protocol
static List<int> MSA = new List<int>(); static List<int> MSA = new List<int>();
static Dictionary<string, List<int>> usingMSA = new Dictionary<string, List<int>>(); static Dictionary<string, List<int>> usingMSA = new Dictionary<string, List<int>>();
private ITcpSessionClient tcpSessionClient; private string clientId = "";
static StandardProtocolPlugin() static StandardProtocolPlugin()
{ {
@ -43,16 +44,11 @@ namespace JiShe.CollectBus.Protocol
return await Task.FromResult(info); return await Task.FromResult(info);
} }
public new async Task LoadAsync() public override async Task AnalyzeAsync(MessageReceivedEvent messageReceivedEvent)
{ {
await base.LoadAsync(); clientId = messageReceivedEvent.ClientId;
}
public override async Task ReceivedAsync(ITcpSessionClient client, ReceivedDataEventArgs e) var cmdResult = AnalysisCmd(messageReceivedEvent.MessageHexString);
{
tcpSessionClient = client;
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
var cmdResult = AnalysisCmd(messageHexString);
if (cmdResult == null) if (cmdResult == null)
{ {
return; return;
@ -61,11 +57,6 @@ namespace JiShe.CollectBus.Protocol
await Task.CompletedTask; await Task.CompletedTask;
} }
public override async Task SendAsync()
{
await Task.CompletedTask;
}
/// <summary> /// <summary>
/// Gets the msa. /// Gets the msa.
/// </summary> /// </summary>
@ -294,9 +285,12 @@ namespace JiShe.CollectBus.Protocol
Fn = 1 Fn = 1
}; };
commandReulst.ReplyBytes = GetCommandBytes(reqParam); commandReulst.ReplyBytes = GetCommandBytes(reqParam);
tcpSessionClient.SendAsync(tcpSessionClient.Id,commandReulst.ReplyBytes); nSender.SendToIssuedAsync(new MessageIssuedEvent
{
ClientId = clientId,
DeviceNo = "",
Message = commandReulst.ReplyBytes
});
} }
else if (commandReulst.Fn == 2)//退出登录 else if (commandReulst.Fn == 2)//退出登录
{ {

View File

@ -0,0 +1,26 @@
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
public class MessageIssuedConsumer : IConsumer<MessageIssuedEvent>
{
private readonly ILogger<MessageIssuedEvent> _logger;
private readonly ITcpService _tcpService;
public MessageIssuedConsumer(ILogger<MessageIssuedEvent> logger, ITcpService tcpService)
{
_logger = logger;
_tcpService = tcpService;
}
public async Task Consume(ConsumeContext<MessageIssuedEvent> context)
{
await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message);
}
}
}

View File

@ -0,0 +1,38 @@
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Net.Sockets;
using JiShe.CollectBus.Protocol.Contracts.Models;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
public class MessageReceivedConsumer : IConsumer<MessageReceivedEvent>
{
private readonly ILogger<MessageReceivedConsumer> _logger;
private readonly IServiceProvider _serviceProvider;
public MessageReceivedConsumer(ILogger<MessageReceivedConsumer> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}
public async Task Consume(ConsumeContext<MessageReceivedEvent> context)
{
const string protocolType = "StandardProtocol";
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>(protocolType);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
else
{
await protocolPlugin.AnalyzeAsync(context.Message);
}
}
}
}

View File

@ -1,13 +0,0 @@
using MassTransit;
using JiShe.CollectBus.RabbitMQ.Models;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
public class PushConsumer : IConsumer<PushDto>
{
public async Task Consume(ConsumeContext<PushDto> context)
{
Console.WriteLine(context.Message.ClientId);
}
}
}

View File

@ -1,13 +0,0 @@
using JiShe.CollectBus.RabbitMQ.Models;
using MassTransit;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
public class ReportConsumer : IConsumer<ReportDto>
{
public async Task Consume(ConsumeContext<ReportDto> context)
{
Console.WriteLine(context.Message.ClientId);
}
}
}

View File

@ -1,55 +0,0 @@
using JiShe.CollectBus.RabbitMQ.Consumers;
using MassTransit;
using Microsoft.Extensions.Configuration;
using RabbitMQ.Client;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddMassTransit(this IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>
{
h.Username(configuration["MQ:UserName"] ?? string.Empty);
h.Password(configuration["MQ:Password"] ?? string.Empty);
});
cfg.ReceiveEndpoint(configuration["MQ:Queue:Push"] ?? string.Empty, x =>
{
x.ConfigureConsumeTopology = false;
x.Consumer<PushConsumer>();
x.Bind("commands", s =>
{
s.RoutingKey = configuration["MQ:Queue:Push"];
s.ExchangeType = ExchangeType.Direct;
});
});
cfg.ReceiveEndpoint(configuration["MQ:Queue:Report"] ?? string.Empty, x =>
{
x.ConfigureConsumeTopology = false;
x.Consumer<ReportConsumer>();
x.Bind("commands", s =>
{
s.RoutingKey = configuration["MQ:Queue:Report"];
s.ExchangeType = ExchangeType.Direct;
});
});
//cfg.UseRawJsonSerializer();
});
});
services.AddMassTransitHostedService();
return services;
}
}
}

View File

@ -15,6 +15,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -0,0 +1,45 @@
using System;
using JiShe.CollectBus.Common.Interfaces;
using JiShe.CollectBus.RabbitMQ.Consumers;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using RabbitMQ.Client;
namespace JiShe.CollectBus.RabbitMQ
{
public class JiSheCollectBusRabbitMqModule: IJiSheModule
{
public void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
{
var configuration = hostContext.Configuration;
services.AddMassTransit(x =>
{
x.AddConsumer<MessageReceivedConsumer>();
x.AddConsumer<MessageIssuedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>
{
h.Username(configuration["MQ:UserName"] ?? string.Empty);
h.Password(configuration["MQ:Password"] ?? string.Empty);
});
// 消息接收队列
cfg.ReceiveEndpoint(configuration["MQ:Queue:Received"] ?? string.Empty,configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedConsumer>(context);
});
// 消息下发队列
cfg.ReceiveEndpoint(configuration["MQ:Queue:Issued"] ?? string.Empty, configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageIssuedConsumer>(context);
});
});
});
}
}
}

View File

@ -1,15 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.RabbitMQ.Models
{
public class PushDto
{
public int ClientId { get; set; }
public string Message { get; set; }
public string DeviceNo { get; set; }
}
}

View File

@ -1,18 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
namespace JiShe.CollectBus.RabbitMQ.Senders
{
public interface IMqSender
{
Task SendToPushAsync<T>(T message, CancellationToken cancellationToken = default) where T : class;
Task SendToPushAsync(object message, CancellationToken cancellationToken = default);
Task SendToReportAsync<T>(T message, CancellationToken cancellationToken = default) where T : class;
Task SendToReportAsync(object message, CancellationToken cancellationToken = default);
Task SendAsync(string queueKey, object message, CancellationToken cancellationToken = default);
}
}

View File

@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.RabbitMQ.Senders
{
public interface INSender
{
Task SendToIssuedAsync<T>(T message, CancellationToken cancellationToken = default) where T : class;
Task SendToIssuedAsync(object message, CancellationToken cancellationToken = default);
Task SendToReceivedAsync<T>(T message, CancellationToken cancellationToken = default) where T : class;
Task SendToReceivedAsync(object message, CancellationToken cancellationToken = default);
Task SendAsync(string queueKey, object message, CancellationToken cancellationToken = default);
}
}

View File

@ -1,52 +0,0 @@
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
using MassTransit;
using Microsoft.Extensions.Configuration;
namespace JiShe.CollectBus.RabbitMQ.Senders
{
public class MqSender : IMqSender, ISingletonDependency
{
private readonly ISendEndpointProvider _sendEndpointProvider;
private readonly IConfiguration _configuration;
public MqSender(ISendEndpointProvider sendEndpointProvider, IConfiguration configuration)
{
_sendEndpointProvider = sendEndpointProvider;
_configuration = configuration;
}
public async Task SendToPushAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
{
var queueKey = _configuration["MQ:Queue:Push"];
await SendAsync(queueKey, message, cancellationToken);
}
public async Task SendToPushAsync(object message, CancellationToken cancellationToken = default)
{
var queueKey = _configuration["MQ:Queue:Push"];
await SendAsync(queueKey, message, cancellationToken);
}
public async Task SendToReportAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
{
var queueKey = _configuration["MQ:Queue:Report"];
//await SendAsync(queueName, message, cancellationToken);
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}"));
await endpoint.Send(message,typeof(T),cancellationToken);
}
public async Task SendToReportAsync(object message, CancellationToken cancellationToken = default)
{
var queueKey = _configuration["MQ:Queue:Report"];
await SendAsync(queueKey, message, cancellationToken);
}
public async Task SendAsync(string queueKey,object message, CancellationToken cancellationToken = default)
{
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}"));
await endpoint.Send(message, cancellationToken);
}
}
}

View File

@ -0,0 +1,53 @@
using MassTransit;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.Common.Extensions.DependencyInjections;
namespace JiShe.CollectBus.RabbitMQ.Senders
{
public class NSender:INSender,ISingletonDependency
{
private readonly ISendEndpointProvider _sendEndpointProvider;
private readonly string _issuedKey;
private readonly string _receivedKey;
public NSender(ISendEndpointProvider sendEndpointProvider, IConfiguration configuration)
{
_sendEndpointProvider = sendEndpointProvider;
_issuedKey = configuration["MQ:Queue:Issued"]!;
_receivedKey = configuration["MQ:Queue:Received"]!;
}
public async Task SendToIssuedAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
{
await SendAsync(_issuedKey, message, cancellationToken);
}
public async Task SendToIssuedAsync(object message, CancellationToken cancellationToken = default)
{
await SendAsync(_issuedKey, message, cancellationToken);
}
public async Task SendToReceivedAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
{
await SendAsync(_receivedKey, message, cancellationToken);
}
public async Task SendToReceivedAsync(object message, CancellationToken cancellationToken = default)
{
await SendAsync(_receivedKey, message, cancellationToken);
}
public async Task SendAsync(string queueKey, object message, CancellationToken cancellationToken = default)
{
var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri($"queue:{queueKey}"));
await endpoint.Send(message, cancellationToken);
}
}
}