From 838ef197e27663f5ad6bfc655257e0e4fcbc2979 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Wed, 2 Apr 2025 09:42:04 +0800
Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E8=87=AA=E5=8A=A8=E5=88=9B?=
=?UTF-8?q?=E5=BB=BA=E4=B8=BB=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../CollectBusHostModule.Configure.cs | 55 ++++++++++++-------
.../Extensions/ProtocolConstExtensions.cs | 29 ++++++++++
2 files changed, 64 insertions(+), 20 deletions(-)
create mode 100644 src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
index f0a02c9..3516bbb 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
@@ -22,6 +22,7 @@ using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using Confluent.Kafka;
+using MassTransit.SqlTransport.Topology;
namespace JiShe.CollectBus.Host
@@ -273,6 +274,11 @@ namespace JiShe.CollectBus.Host
}
+ ///
+ /// Configures the mass transit.
+ ///
+ /// The context.
+ /// The configuration.
///
/// Configures the mass transit.
///
@@ -280,20 +286,29 @@ namespace JiShe.CollectBus.Host
/// The configuration.
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
{
+ var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup };
+ var producerConfig = new ProducerConfig();
- //context.Services.AddSingleton();
-
- context.Services.AddMassTransit(x =>
+ context.Services
+ .ConfigureKafkaTestOptions(options =>
{
- x.UsingInMemory();
-
- x.AddConfigureEndpointsCallback((c, name, cfg) =>
- {
- cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
- cfg.UseMessageRetry(r => r.Immediate(5));
- cfg.UseInMemoryOutbox(c);
- });
+#if DEBUG
+ options.CleanTopicsOnStart = true;// 测试时,每次启动都删除topic,生产环境不需要
+#endif
+ options.CreateTopicsIfNotExists = true;
+ options.TopicNames = ProtocolConstExtensions.GetAllTopicNames();
+ })
+ .AddMassTransit(x =>
+ {
+ x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
+
+ //x.AddConfigureEndpointsCallback((c, name, cfg) =>
+ //{
+ // cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
+ // cfg.UseMessageRetry(r => r.Immediate(5));
+ // cfg.UseInMemoryOutbox(c);
+ //});
x.AddRider(rider =>
{
@@ -310,37 +325,37 @@ namespace JiShe.CollectBus.Host
});
rider.AddConsumer();
-
+
rider.UsingKafka((c, cfg) =>
{
-
- cfg.Host(configuration.GetConnectionString("Kafka"));
+ List hosts = new List() { "121.42.242.91:29092", "121.42.242.91:39092", "121.42.242.91:49092" };
+ cfg.Host(hosts);
- cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedHeartbeatEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedHeartbeatEventName, consumerConfig, configurator =>
{
- configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
+ configurator.ConfigureConsumer(c);
});
- cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedLoginEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedLoginEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
- cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
- cfg.TopicEndpoint(ProtocolConst.SubscriberIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.SubscriberIssuedEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
});
- cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, ProtocolConst.SubscriberGroup, configurator =>
+ cfg.TopicEndpoint(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, consumerConfig, configurator =>
{
configurator.ConfigureConsumer(c);
configurator.AutoOffsetReset = AutoOffsetReset.Earliest;
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs
new file mode 100644
index 0000000..9455144
--- /dev/null
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs
@@ -0,0 +1,29 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Protocol.Contracts
+{
+ public class ProtocolConstExtensions
+ {
+ ///
+ /// 自动获取 ProtocolConst 类中所有 Kafka 主题名称
+ /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
+ ///
+ public static List GetAllTopicNames()
+ {
+ return typeof(ProtocolConst)
+ .GetFields(BindingFlags.Public | BindingFlags.Static)
+ .Where(f =>
+ f.IsLiteral &&
+ !f.IsInitOnly &&
+ f.FieldType == typeof(string) &&
+ f.Name.EndsWith("EventName")) // 通过命名规则过滤主题字段
+ .Select(f => (string)f.GetRawConstantValue()!)
+ .ToList();
+ }
+ }
+}