JiShe.CollectBus/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs
2024-11-08 13:50:47 +08:00

84 lines
3.7 KiB
C#

using JiShe.CollectBus.Common.Interfaces;
using JiShe.CollectBus.RabbitMQ.Consumers;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Data;
namespace JiShe.CollectBus.RabbitMQ
{
public class JiSheCollectBusRabbitMqModule: IJiSheModule
{
public void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext)
{
var configuration = hostContext.Configuration;
services.AddMassTransit(x =>
{
x.AddConsumer<MessageReceivedConsumer>(cfg =>
{
cfg.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(s: 1)
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
//.GroupBy<MyMessage, string>(x => x.CustomerId)
.SetConcurrencyLimit(10));
});
x.AddConsumer<MessageIssuedConsumer>();
x.AddConsumer<MessageReceivedLoginConsumer>();
x.AddConsumer<MessageReceivedHeartbeatConsumer>();
x.AddConfigureEndpointsCallback((context, name, cfg) =>
{
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox(context);
//cfg.UseMessageRetry(r =>
//{
// r.Immediate(5);
// r.Handle<DataException>(x => x.Message.Contains("SQL"));
//});
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>
{
h.Username(configuration["MQ:UserName"] ?? string.Empty);
h.Password(configuration["MQ:Password"] ?? string.Empty);
});
// 消息接收队列
cfg.ReceiveEndpoint(configuration["MQ:Queue:Received"] ?? string.Empty,configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedConsumer>(context);
configurator.Durable = true;
});
// 登录
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Login", configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedLoginConsumer>(context);
configurator.Durable = true;
});
// 心跳
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Heartbeat", configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedHeartbeatConsumer>(context);
configurator.Durable = true;
});
// 消息下发队列
cfg.ReceiveEndpoint(configuration["MQ:Queue:Issued"] ?? string.Empty, configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageIssuedConsumer>(context);
configurator.Durable = true;
});
});
});
}
}
}