From beb11c76f534c1bd0e951f52c9565584095df3a3 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Wed, 9 Apr 2025 14:31:48 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Subscribers/WorkerSubscriberAppService.cs | 22 ++---
.../CollectBusHostModule.Configure.cs | 81 ++++++++++---------
.../CollectBusHostModule.cs | 1 +
src/JiShe.CollectBus.Host/appsettings.json | 3 +-
.../Extensions/ProtocolConstExtensions.cs | 11 ++-
5 files changed, 65 insertions(+), 53 deletions(-)
diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
index 82672f6..4428fe4 100644
--- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
+++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs
@@ -148,7 +148,7 @@ namespace JiShe.CollectBus.Subscribers
#region 水表消息采集
///
- /// 一分钟定时抄读任务消息消费订阅
+ /// 1分钟采集水表数据下行消息消费订阅
///
///
///
@@ -157,11 +157,11 @@ namespace JiShe.CollectBus.Subscribers
[CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
- _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理");
+ _logger.LogInformation("1分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
if (protocolPlugin == null)
{
- _logger.LogError("【1分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
+ _logger.LogError("【1分钟采集水表数据下行消息消费队列开始处理】协议不存在!");
}
else
{
@@ -175,20 +175,20 @@ namespace JiShe.CollectBus.Subscribers
}
///
- /// 5分钟采集电表数据下行消息消费订阅
+ /// 5分钟采集水表数据下行消息消费订阅
///
///
///
[HttpPost]
[Route("watermeter/fiveminute/issued-event")]
- [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)]
+ [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
- _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理");
+ _logger.LogInformation("5分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
if (protocolPlugin == null)
{
- _logger.LogError("【5分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
+ _logger.LogError("【5分钟采集水表数据下行消息消费队列开始处理】协议不存在!");
}
else
{
@@ -202,20 +202,20 @@ namespace JiShe.CollectBus.Subscribers
}
///
- /// 15分钟采集电表数据下行消息消费订阅
+ /// 15分钟采集水表数据下行消息消费订阅
///
///
///
[HttpPost]
[Route("watermeter/fifteenminute/issued-event")]
- [CapSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
+ [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage)
{
- _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");
+ _logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理");
var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin");
if (protocolPlugin == null)
{
- _logger.LogError("【15分钟采集电表数据下行消息消费队列开始处理】协议不存在!");
+ _logger.LogError("【15分钟采集水表数据下行消息消费队列开始处理】协议不存在!");
}
else
{
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
index 32c39b5..96e1e18 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs
@@ -256,10 +256,7 @@ namespace JiShe.CollectBus.Host
/// The context.
/// The configuration.
public void ConfigureCap(ServiceConfigurationContext context, IConfiguration configuration)
- {
- List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
- topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
-
+ {
context.Services.AddCap(x =>
{
x.DefaultGroupName = ProtocolConst.SubscriberGroup;
@@ -287,40 +284,7 @@ namespace JiShe.CollectBus.Host
/// Configures the mass transit.
///
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
- {
-
-
- var adminClient = new AdminClientBuilder(new AdminClientConfig
- {
- BootstrapServers = configuration.GetConnectionString("Kafka")
- }).Build();
-
- try
- {
- List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
- topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
-
- List topicSpecifications = new List();
- foreach (var item in topics)
- {
- topicSpecifications.Add(new TopicSpecification
- {
- Name = item,
- NumPartitions = 3,
- ReplicationFactor = 1
- });
- }
- adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult();
- }
- catch (CreateTopicsException e)
- {
- if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
- {
- throw;
- }
- }
-
-
+ {
var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup };
var producerConfig = new ProducerConfig();
@@ -391,5 +355,46 @@ namespace JiShe.CollectBus.Host
});
});
}
+
+ ///
+ /// 配置Kafka主题
+ ///
+ ///
+ ///
+ public void ConfigureKafkaTopic(ServiceConfigurationContext context, IConfiguration configuration)
+ {
+ var adminClient = new AdminClientBuilder(new AdminClientConfig
+ {
+ BootstrapServers = configuration.GetConnectionString("Kafka")
+ }).Build();
+
+ try
+ {
+ string serverTagName = configuration.GetSection("ServerTagName").Value!;
+
+ List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName);
+ topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName));
+
+ List topicSpecifications = new List();
+ foreach (var item in topics)
+ {
+ topicSpecifications.Add(new TopicSpecification
+ {
+ Name = item,
+ NumPartitions = 3,
+ ReplicationFactor = 1
+ });
+ }
+ adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+ catch (CreateTopicsException e)
+ {
+ if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
+ {
+ throw;
+ }
+ }
+
+ }
}
}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
index 6f24b1e..f18fcdb 100644
--- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
+++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs
@@ -45,6 +45,7 @@ namespace JiShe.CollectBus.Host
ConfigureHangfire(context);
ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration);
+ ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context);
ConfigureCustom(context, configuration);
}
diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json
index 9694e44..2ef7ded 100644
--- a/src/JiShe.CollectBus.Host/appsettings.json
+++ b/src/JiShe.CollectBus.Host/appsettings.json
@@ -81,5 +81,6 @@
"DataBaseName": "energy",
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
- }
+ },
+ "ServerTagName": "JiSheCollectBus"
}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs
index 8c097e2..67abc75 100644
--- a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs
+++ b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs
@@ -15,24 +15,27 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
///
- public static List GetAllTopicNamesByIssued()
+ public static List GetAllTopicNamesByIssued(string serverTagName)
{
- return typeof(ProtocolConst)
+ List topics = typeof(ProtocolConst)
.GetFields(BindingFlags.Public | BindingFlags.Static)
.Where(f =>
f.IsLiteral &&
!f.IsInitOnly &&
f.FieldType == typeof(string) &&
f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段
+ //.Select(f => $"{serverTagName}.{(string)f.GetRawConstantValue()!}")
.Select(f => (string)f.GetRawConstantValue()!)
.ToList();
+
+ return topics;
}
///
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
///
- public static List GetAllTopicNamesByReceived()
+ public static List GetAllTopicNamesByReceived(string serverTagName)
{
//固定的上报主题
var topicList = typeof(ProtocolConst)
@@ -52,6 +55,8 @@ namespace JiShe.CollectBus.Protocol.Contracts
topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0')));
}
+ //return topicList.Select(f => $"{serverTagName}.{f}").ToList();
+
return topicList;
}
}