JiShe.CollectBus/JiShe.CollectBus.Service/JiSheCollectBusMQConsumerModule.cs
2024-11-13 17:50:52 +08:00

88 lines
3.8 KiB
C#

using JiShe.CollectBus.Network;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp;
using Volo.Abp.Modularity;
namespace JiShe.CollectBus.MQ.Consumer
{
[DependsOn(typeof(JiSheCollectBusNetworkModule))]
public class JiSheCollectBusMqConsumerModule: AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
context.Services.AddMassTransit(x =>
{
x.AddConsumer<MessageReceivedConsumer>(cfg =>
{
cfg.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(s: 1)
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
//.GroupBy<MyMessage, string>(x => x.CustomerId)
.SetConcurrencyLimit(10));
});
x.AddConsumer<MessageIssuedConsumer>();
x.AddConsumer<MessageReceivedLoginConsumer>();
x.AddConsumer<MessageReceivedHeartbeatConsumer>();
x.AddConfigureEndpointsCallback((c, name, cfg) =>
{
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox(c);
//cfg.UseMessageRetry(r =>
//{
// r.Immediate(5);
// r.Handle<DataException>(x => x.Message.Contains("SQL"));
//});
});
x.UsingRabbitMq((c, cfg) =>
{
cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>
{
h.Username(configuration["MQ:UserName"] ?? string.Empty);
h.Password(configuration["MQ:Password"] ?? string.Empty);
});
// 消息接收队列
cfg.ReceiveEndpoint(configuration["MQ:Queue:Received"] ?? string.Empty, configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedConsumer>(c);
configurator.Durable = true;
});
// 登录
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Login", configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedLoginConsumer>(c);
configurator.Durable = true;
});
// 心跳
cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Heartbeat", configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageReceivedHeartbeatConsumer>(c);
configurator.Durable = true;
});
// 消息下发队列
cfg.ReceiveEndpoint(configuration["MQ:Queue:Issued"] ?? string.Empty, configurator =>
{
configurator.ConfigureConsumeTopology = false;
configurator.Consumer<MessageIssuedConsumer>(c);
configurator.Durable = true;
});
});
});
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
}
}
}