尝试自动创建主题

This commit is contained in:
ChenYi 2025-04-02 09:42:04 +08:00
parent 5a294b437c
commit 838ef197e2
2 changed files with 64 additions and 20 deletions

View File

@ -22,6 +22,7 @@ using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using Confluent.Kafka;
using MassTransit.SqlTransport.Topology;
namespace JiShe.CollectBus.Host
@ -273,6 +274,11 @@ namespace JiShe.CollectBus.Host
}
/// <summary>
/// Configures the mass transit.
/// </summary>
/// <param name="context">The context.</param>
/// <param name="configuration">The configuration.</param>
/// <summary>
/// Configures the mass transit.
/// </summary>
@ -280,20 +286,29 @@ namespace JiShe.CollectBus.Host
/// <param name="configuration">The configuration.</param>
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
{
var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup };
var producerConfig = new ProducerConfig();
//context.Services.AddSingleton<IBus>();
context.Services.AddMassTransit(x =>
context.Services
.ConfigureKafkaTestOptions(options =>
{
x.UsingInMemory();
#if DEBUG
options.CleanTopicsOnStart = true;// 测试时每次启动都删除topic,生产环境不需要
#endif
options.CreateTopicsIfNotExists = true;
options.TopicNames = ProtocolConstExtensions.GetAllTopicNames();
})
.AddMassTransit(x =>
{
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
x.AddConfigureEndpointsCallback((c, name, cfg) =>
{
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox(c);
});
//x.AddConfigureEndpointsCallback((c, name, cfg) =>
//{
// cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
// cfg.UseMessageRetry(r => r.Immediate(5));
// cfg.UseInMemoryOutbox(c);
//});
x.AddRider(rider =>
{
@ -313,34 +328,34 @@ namespace JiShe.CollectBus.Host
rider.UsingKafka((c, cfg) =>
{
List<string> hosts = new List<string>() { "121.42.242.91:29092", "121.42.242.91:39092", "121.42.242.91:49092" };
cfg.Host(hosts);
cfg.Host(configuration.GetConnectionString("Kafka"));
cfg.TopicEndpoint<MessageReceivedHeartbeat>(ProtocolConst.SubscriberReceivedHeartbeatEventName, ProtocolConst.SubscriberGroup, configurator =>
cfg.TopicEndpoint<MessageReceivedHeartbeat>(ProtocolConst.SubscriberReceivedHeartbeatEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer<ReceivedHeartbeatConsumer>(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
configurator.ConfigureConsumer<ReceivedHeartbeatConsumer>(c);
});
cfg.TopicEndpoint<MessageReceivedLogin>(ProtocolConst.SubscriberReceivedLoginEventName, ProtocolConst.SubscriberGroup, configurator =>
cfg.TopicEndpoint<MessageReceivedLogin>(ProtocolConst.SubscriberReceivedLoginEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer<ReceivedLoginConsumer>(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberReceivedEventName, ProtocolConst.SubscriberGroup, configurator =>
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer<ReceivedConsumer>(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
cfg.TopicEndpoint<MessageReceived>(ProtocolConst.SubscriberIssuedEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer<IssuedConsumer>(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
cfg.TopicEndpoint<ScheduledMeterReadingIssuedEventMessage>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
cfg.TopicEndpoint<ScheduledMeterReadingIssuedEventMessage>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer<ScheduledMeterReadingConsumer>(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;

View File

@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts
{
public class ProtocolConstExtensions
{
/// <summary>
/// 自动获取 ProtocolConst 类中所有 Kafka 主题名称
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
/// </summary>
public static List<string> GetAllTopicNames()
{
return typeof(ProtocolConst)
.GetFields(BindingFlags.Public | BindingFlags.Static)
.Where(f =>
f.IsLiteral &&
!f.IsInitOnly &&
f.FieldType == typeof(string) &&
f.Name.EndsWith("EventName")) // 通过命名规则过滤主题字段
.Select(f => (string)f.GetRawConstantValue()!)
.ToList();
}
}
}