From 179b9a2e91e13e5314a91c6989e018ff4fb1843d Mon Sep 17 00:00:00 2001
From: zenghongyao <873884283@qq.com>
Date: Fri, 23 May 2025 14:01:33 +0800
Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../AdminClient/AdminClientService.cs | 20 +++++++++----------
.../Consumer/ConsumerService.cs | 8 ++++++++
.../KafkaSubscribeExtensions.cs | 10 +++-------
3 files changed, 21 insertions(+), 17 deletions(-)
diff --git a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs
index febc2bf..3ec24ef 100644
--- a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs
+++ b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs
@@ -27,7 +27,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// Gets or sets the instance.
+ /// Gets or sets the instance.
///
///
/// The instance.
@@ -65,7 +65,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// 删除Kafka主题
+ /// 删除Kafka主题
///
///
///
@@ -75,7 +75,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// 获取Kafka主题列表
+ /// 获取Kafka主题列表
///
///
public async Task> ListTopicsAsync()
@@ -85,7 +85,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// 判断Kafka主题是否存在
+ /// 判断Kafka主题是否存在
///
///
///
@@ -96,7 +96,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// 检测分区是否存在
+ /// 检测分区是否存在
///
///
///
@@ -112,7 +112,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// 检测分区是否存在
+ /// 检测分区是否存在
///
///
///
@@ -127,7 +127,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// 获取主题的分区数量
+ /// 获取主题的分区数量
///
///
///
@@ -140,7 +140,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// Gets the instance.
+ /// Gets the instance.
///
///
public IAdminClient GetInstance()
@@ -160,7 +160,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// Checks the topic asynchronous.
+ /// Checks the topic asynchronous.
///
/// The topic.
///
@@ -171,7 +171,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency
}
///
- /// 判断Kafka主题是否存在
+ /// 判断Kafka主题是否存在
///
/// 主题名称
/// 副本数量,不能高于Brokers数量
diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
index ce76d19..685915d 100644
--- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
@@ -135,6 +135,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
+ // 扩展独立线程,避免阻塞
+ _kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
@@ -239,6 +241,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
+ // 扩展独立线程,避免阻塞
+ _kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
@@ -367,6 +371,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
+ // 扩展独立线程,避免阻塞
+ _kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
@@ -538,6 +544,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
try
{
+ // 扩展独立线程,避免阻塞
+ _kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
index 28b719b..15eb60d 100644
--- a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
+++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
@@ -108,6 +108,8 @@ namespace JiShe.CollectBus.Kafka
{
logger.LogError(ex, "Kafka任务调度异常");
};
+ //logger.LogWarning($"kafka订阅工作线程数:{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数");
+ //
// 订阅调度监控测试可打开
//_ = Task.Factory.StartNew(async () =>
// {
@@ -235,15 +237,11 @@ namespace JiShe.CollectBus.Kafka
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger logger)
{
var consumerService = provider.GetRequiredService();
- var kafkaTaskScheduler = provider.GetRequiredService();
-
+
if (attr.EnableBatch)
{
Interlocked.Increment(ref _threadStartCount);
logger.LogInformation($"kafka开启线程消费:{_threadStartCount}");
- // 扩展独立线程,避免阻塞
- kafkaTaskScheduler.WorkerThreadExpansion();
-
await consumerService.SubscribeBatchAsync(attr.Topic, async (message) =>
{
try
@@ -271,8 +269,6 @@ namespace JiShe.CollectBus.Kafka
{
Interlocked.Increment(ref _threadStartCount);
logger.LogInformation($"kafka开启线程消费:{_threadStartCount}");
- // 扩展独立线程,避免阻塞
- kafkaTaskScheduler.WorkerThreadExpansion();
await consumerService.SubscribeAsync(attr.Topic, async (message) =>
{
try