using JiShe.CollectBus.Common.Interfaces; using JiShe.CollectBus.RabbitMQ.Consumers; using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System.Data; namespace JiShe.CollectBus.RabbitMQ { public class JiSheCollectBusRabbitMqModule: IJiSheModule { public void ConfigureServices(IServiceCollection services, HostBuilderContext hostContext) { var configuration = hostContext.Configuration; services.AddMassTransit(x => { x.AddConsumer(cfg => { cfg.Options(options => options .SetMessageLimit(100) .SetTimeLimit(s: 1) .SetTimeLimitStart(BatchTimeLimitStart.FromLast) //.GroupBy(x => x.CustomerId) .SetConcurrencyLimit(10)); }); x.AddConsumer(); x.AddConsumer(); x.AddConsumer(); x.AddConfigureEndpointsCallback((context, name, cfg) => { cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); cfg.UseMessageRetry(r => r.Immediate(5)); cfg.UseInMemoryOutbox(context); //cfg.UseMessageRetry(r => //{ // r.Immediate(5); // r.Handle(x => x.Message.Contains("SQL")); //}); }); x.UsingRabbitMq((context, cfg) => { cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h => { h.Username(configuration["MQ:UserName"] ?? string.Empty); h.Password(configuration["MQ:Password"] ?? string.Empty); }); // 消息接收队列 cfg.ReceiveEndpoint(configuration["MQ:Queue:Received"] ?? string.Empty,configurator => { configurator.ConfigureConsumeTopology = false; configurator.Consumer(context); configurator.Durable = true; }); // 登录 cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Login", configurator => { configurator.ConfigureConsumeTopology = false; configurator.Consumer(context); configurator.Durable = true; }); // 心跳 cfg.ReceiveEndpoint($"{configuration["MQ:Queue:Received"]}_Heartbeat", configurator => { configurator.ConfigureConsumeTopology = false; configurator.Consumer(context); configurator.Durable = true; }); // 消息下发队列 cfg.ReceiveEndpoint(configuration["MQ:Queue:Issued"] ?? string.Empty, configurator => { configurator.ConfigureConsumeTopology = false; configurator.Consumer(context); configurator.Durable = true; }); }); }); } } }