JiShe.CollectBus/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs

87 lines
3.8 KiB
C#
Raw Normal View History

2024-11-13 00:30:24 +08:00
using JiShe.CollectBus.RabbitMQ.Consumers;
2024-10-28 16:23:39 +08:00
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
2024-11-13 00:30:24 +08:00
using Volo.Abp;
using Volo.Abp.Modularity;
2024-10-28 16:23:39 +08:00
namespace JiShe.CollectBus.RabbitMQ
{
2024-11-13 00:30:24 +08:00
public class JiSheCollectBusRabbitMqModule: AbpModule
2024-10-28 16:23:39 +08:00
{
2024-11-13 00:30:24 +08:00
public override void ConfigureServices(ServiceConfigurationContext context)
2024-10-28 16:23:39 +08:00
{
2024-11-13 00:30:24 +08:00
var configuration = context.Services.GetConfiguration();
2024-11-12 18:18:43 +08:00
context.Services.AddMassTransit(x =>
2024-10-28 16:23:39 +08:00
{
2024-11-08 13:50:47 +08:00
x.AddConsumer<MessageReceivedConsumer>(cfg =>
{
cfg.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(s: 1)
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
//.GroupBy<MyMessage, string>(x => x.CustomerId)
.SetConcurrencyLimit(10));
});
2024-10-28 16:23:39 +08:00
x.AddConsumer<MessageIssuedConsumer>();
2024-10-29 16:28:14 +08:00
x.AddConsumer<MessageReceivedLoginConsumer>();
x.AddConsumer<MessageReceivedHeartbeatConsumer>();
2024-11-08 13:50:47 +08:00
2024-11-12 18:18:43 +08:00
x.AddConfigureEndpointsCallback((c, name, cfg) =>
2024-11-08 13:50:47 +08:00
{
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
2024-11-12 18:18:43 +08:00
cfg.UseInMemoryOutbox(c);
2024-11-08 13:50:47 +08:00
//cfg.UseMessageRetry(r =>
//{
// r.Immediate(5);
// r.Handle<DataException>(x => x.Message.Contains("SQL"));
//});
});
2024-11-12 18:18:43 +08:00
x.UsingRabbitMq((c, cfg) =>
2024-10-28 16:23:39 +08:00
{
cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>
{
h.Username(configuration["MQ:UserName"] ?? string.Empty);
h.Password(configuration["MQ:Password"] ?? string.Empty);
});
// 消息接收队列
2024-11-13 00:30:24 +08:00
cfg.ReceiveEndpoint(configuration["MQ:Queue:Received"] ?? string.Empty, configurator =>
2024-10-28 16:23:39 +08:00
{
configurator.ConfigureConsumeTopology = false;
2024-11-12 18:18:43 +08:00
configurator.Consumer<MessageReceivedConsumer>(c);
2024-10-30 17:49:05 +08:00
configurator.Durable = true;
2024-10-28 16:23:39 +08:00
});
2024-10-29 16:28:14 +08:00
// 登录
2024-10-30 17:49:05 +08:00
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Login", configurator =>
2024-10-29 16:28:14 +08:00
{
configurator.ConfigureConsumeTopology = false;
2024-11-12 18:18:43 +08:00
configurator.Consumer<MessageReceivedLoginConsumer>(c);
2024-10-30 17:49:05 +08:00
configurator.Durable = true;
2024-10-29 16:28:14 +08:00
});
// 心跳
2024-10-30 17:49:05 +08:00
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Heartbeat", configurator =>
2024-10-29 16:28:14 +08:00
{
configurator.ConfigureConsumeTopology = false;
2024-11-12 18:18:43 +08:00
configurator.Consumer<MessageReceivedHeartbeatConsumer>(c);
2024-10-30 17:49:05 +08:00
configurator.Durable = true;
2024-10-29 16:28:14 +08:00
});
2024-10-28 16:23:39 +08:00
// 消息下发队列
cfg.ReceiveEndpoint(configuration["MQ:Queue:Issued"] ?? string.Empty, configurator =>
{
configurator.ConfigureConsumeTopology = false;
2024-11-12 18:18:43 +08:00
configurator.Consumer<MessageIssuedConsumer>(c);
2024-10-30 17:49:05 +08:00
configurator.Durable = true;
2024-10-28 16:23:39 +08:00
});
});
});
}
2024-11-13 00:30:24 +08:00
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
}
2024-10-28 16:23:39 +08:00
}
}