diff --git a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs index febc2bf..3ec24ef 100644 --- a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs +++ b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs @@ -27,7 +27,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// Gets or sets the instance. + /// Gets or sets the instance. /// /// /// The instance. @@ -65,7 +65,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// 删除Kafka主题 + /// 删除Kafka主题 /// /// /// @@ -75,7 +75,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// 获取Kafka主题列表 + /// 获取Kafka主题列表 /// /// public async Task> ListTopicsAsync() @@ -85,7 +85,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// 判断Kafka主题是否存在 + /// 判断Kafka主题是否存在 /// /// /// @@ -96,7 +96,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// 检测分区是否存在 + /// 检测分区是否存在 /// /// /// @@ -112,7 +112,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// 检测分区是否存在 + /// 检测分区是否存在 /// /// /// @@ -127,7 +127,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// 获取主题的分区数量 + /// 获取主题的分区数量 /// /// /// @@ -140,7 +140,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// Gets the instance. + /// Gets the instance. /// /// public IAdminClient GetInstance() @@ -160,7 +160,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// Checks the topic asynchronous. + /// Checks the topic asynchronous. /// /// The topic. /// @@ -171,7 +171,7 @@ public class AdminClientService : IAdminClientService, ISingletonDependency } /// - /// 判断Kafka主题是否存在 + /// 判断Kafka主题是否存在 /// /// 主题名称 /// 副本数量,不能高于Brokers数量 diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index ce76d19..685915d 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -135,6 +135,8 @@ namespace JiShe.CollectBus.Kafka.Consumer { try { + // 扩展独立线程,避免阻塞 + _kafkaTaskScheduler.WorkerThreadExpansion(); await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; @@ -239,6 +241,8 @@ namespace JiShe.CollectBus.Kafka.Consumer { try { + // 扩展独立线程,避免阻塞 + _kafkaTaskScheduler.WorkerThreadExpansion(); await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; @@ -367,6 +371,8 @@ namespace JiShe.CollectBus.Kafka.Consumer { try { + // 扩展独立线程,避免阻塞 + _kafkaTaskScheduler.WorkerThreadExpansion(); await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { @@ -538,6 +544,8 @@ namespace JiShe.CollectBus.Kafka.Consumer { try { + // 扩展独立线程,避免阻塞 + _kafkaTaskScheduler.WorkerThreadExpansion(); await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs index 28b719b..15eb60d 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs @@ -108,6 +108,8 @@ namespace JiShe.CollectBus.Kafka { logger.LogError(ex, "Kafka任务调度异常"); }; + //logger.LogWarning($"kafka订阅工作线程数:{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数"); + // // 订阅调度监控测试可打开 //_ = Task.Factory.StartNew(async () => // { @@ -235,15 +237,11 @@ namespace JiShe.CollectBus.Kafka private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); - var kafkaTaskScheduler = provider.GetRequiredService(); - + if (attr.EnableBatch) { Interlocked.Increment(ref _threadStartCount); logger.LogInformation($"kafka开启线程消费:{_threadStartCount}"); - // 扩展独立线程,避免阻塞 - kafkaTaskScheduler.WorkerThreadExpansion(); - await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { try @@ -271,8 +269,6 @@ namespace JiShe.CollectBus.Kafka { Interlocked.Increment(ref _threadStartCount); logger.LogInformation($"kafka开启线程消费:{_threadStartCount}"); - // 扩展独立线程,避免阻塞 - kafkaTaskScheduler.WorkerThreadExpansion(); await consumerService.SubscribeAsync(attr.Topic, async (message) => { try