JiShe.CollectBus/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs
2024-11-12 18:18:43 +08:00

86 lines
3.7 KiB
C#

using JiShe.CollectBus.Common.Interfaces;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.RabbitMQ.Consumers;
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Data;
namespace JiShe.CollectBus.RabbitMQ
{
public class JiSheCollectBusRabbitMqModule: IJiSheModule
{
public void ConfigureServices(ServiceContext context)
{
var configuration = context.Configuration;
context.Services.AddMassTransit(x =>
{
x.AddConsumer<MessageReceivedConsumer>(cfg =>
{
cfg.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(s: 1)
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
//.GroupBy<MyMessage, string>(x => x.CustomerId)
.SetConcurrencyLimit(10));
});
x.AddConsumer<MessageIssuedConsumer>();
x.AddConsumer<MessageReceivedLoginConsumer>();
x.AddConsumer<MessageReceivedHeartbeatConsumer>();
x.AddConfigureEndpointsCallback((c, name, cfg) =>
{
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox(c);
//cfg.UseMessageRetry(r =>
//{
// r.Immediate(5);
// r.Handle<DataException>(x => x.Message.Contains("SQL"));
//});
});
x.UsingRabbitMq((c, cfg) =>
{
cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>
{
h.Username(configuration["MQ:UserName"] ?? string.Empty);
h.Password(configuration["MQ:Password"] ?? string.Empty);
});
// 消息接收队列
cfg.ReceiveEndpoint(configuration["MQ:Queue:Received"] ?? string.Empty,configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedConsumer>(c);
configurator.Durable = true;
});
// 登录
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Login", configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedLoginConsumer>(c);
configurator.Durable = true;
});
// 心跳
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Heartbeat", configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedHeartbeatConsumer>(c);
configurator.Durable = true;
});
// 消息下发队列
cfg.ReceiveEndpoint(configuration["MQ:Queue:Issued"] ?? string.Empty, configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageIssuedConsumer>(c);
configurator.Durable = true;
});
});
});
}
}
}