From 2d2245fa854a4ea058ce77d110271949736e433a Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Fri, 23 May 2025 10:07:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96kafka=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E4=B8=80=E4=B8=AA=E5=88=86=E5=8C=BA=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E7=BA=BF=E7=A8=8B=EF=BC=8C=E8=A7=A3=E5=86=B3=E5=BD=B1?= =?UTF-8?q?=E5=93=8D=E4=B8=BB=E7=BA=BF=E7=A8=8B=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusKafkaModule.cs | 20 +- .../Consumer/ConsumerService.cs | 22 ++- .../Internal/KafkaOptionConfig.cs | 7 + .../Internal/KafkaTaskScheduler.cs | 177 ++++++++++++++++++ .../KafkaSubscribeExtensions.cs | 67 +++++-- .../appsettings.Production.json | 1 + web/JiShe.CollectBus.Host/appsettings.json | 3 +- 7 files changed, 260 insertions(+), 37 deletions(-) create mode 100644 modules/JiShe.CollectBus.Kafka/Internal/KafkaTaskScheduler.cs diff --git a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs index d31b9ed..d7190b2 100644 --- a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs +++ b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs @@ -1,11 +1,13 @@ using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; using System.Reflection; using Volo.Abp; using Volo.Abp.DependencyInjection; @@ -20,22 +22,14 @@ namespace JiShe.CollectBus.Kafka { var configuration = context.Services.GetConfiguration(); //var kafkaSection = configuration.GetSection(CommonConst.Kafka); - //KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig (); + //KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig(); //kafkaSection.Bind(kafkaOptionConfig); - //if (configuration[CommonConst.ServerTagName] != null) - //{ - // kafkaOptionConfig.ServerTagName = configuration[CommonConst.ServerTagName]!; - //} - //context.Services.AddSingleton(kafkaOptionConfig); - - //context.Services.Configure(context.Services.GetConfiguration().GetSection(CommonConst.Kafka)); - + //Configure(kafkaSection); Configure(options => { configuration.GetSection(CommonConst.Kafka).Bind(options); }); - // 注册Producer context.Services.AddSingleton(); // 注册Consumer @@ -44,6 +38,12 @@ namespace JiShe.CollectBus.Kafka // 注册Polly context.Services.AddSingleton(); + var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); + + // 注册任务调度 + context.Services.AddSingleton(); + //context.Services.AddHostedService(); } diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 31b2d9c..ce76d19 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -37,17 +37,21 @@ namespace JiShe.CollectBus.Kafka.Consumer private readonly KafkaPollyPipeline _kafkaPollyPipeline; + + private readonly KafkaTaskScheduler _kafkaTaskScheduler; + /// /// ConsumerService /// /// /// - public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions applicationOptions) + public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions applicationOptions, KafkaTaskScheduler kafkaTaskScheduler) { _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; _applicationOptions = applicationOptions.Value; _kafkaPollyPipeline = kafkaPollyPipeline; + _kafkaTaskScheduler = kafkaTaskScheduler; } #region private 私有方法 @@ -148,7 +152,7 @@ namespace JiShe.CollectBus.Kafka.Consumer var cts = consumerStore.CTS; consumer!.Subscribe(topics); - _ = Task.Run(async () => + _= Task.Factory.StartNew(async () => { while (!cts.IsCancellationRequested) { @@ -209,7 +213,7 @@ namespace JiShe.CollectBus.Kafka.Consumer _logger.LogError(ex, "处理消息时发生未知错误"); } } - }, cts.Token); + }, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler); await Task.CompletedTask; }); @@ -254,7 +258,7 @@ namespace JiShe.CollectBus.Kafka.Consumer consumer!.Subscribe(topics); - _ = Task.Run(async () => + _ = Task.Factory.StartNew(async () => { int count = 0; while (!cts.IsCancellationRequested) @@ -314,7 +318,7 @@ namespace JiShe.CollectBus.Kafka.Consumer _logger.LogError(ex, "处理消息时发生未知错误"); } } - }, cts.Token); + }, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler); await Task.CompletedTask; }); } @@ -385,7 +389,7 @@ namespace JiShe.CollectBus.Kafka.Consumer var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 - _ = Task.Run(async () => + _ = Task.Factory.StartNew(async () => { var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); var startTime = DateTime.UtcNow; @@ -482,7 +486,7 @@ namespace JiShe.CollectBus.Kafka.Consumer _logger.LogError(ex, "处理批量消息时发生未知错误"); } } - }, cts.Token); + }, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler); await Task.CompletedTask; }); @@ -555,7 +559,7 @@ namespace JiShe.CollectBus.Kafka.Consumer var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 - _ = Task.Run(async () => + _ = Task.Factory.StartNew(async () => { var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); var startTime = DateTime.UtcNow; @@ -650,7 +654,7 @@ namespace JiShe.CollectBus.Kafka.Consumer _logger.LogError(ex, "处理批量消息时发生未知错误"); } } - }, cts.Token); + }, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler); await Task.CompletedTask; }); diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index 38c9482..3b053c8 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -49,4 +49,11 @@ public class KafkaOptionConfig /// public string? SaslPassword { get; set; } + /// + /// 订阅任务线程数量 + /// 当主题未指定时,订阅任务线程数量默认为:-1 + /// 优先级低于订阅任务特性TaskCount值 + /// + public int TaskThreadCount { get; set; } = -1; + } \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaTaskScheduler.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaTaskScheduler.cs new file mode 100644 index 0000000..c5443f4 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaTaskScheduler.cs @@ -0,0 +1,177 @@ +using Confluent.Kafka; +using Microsoft.AspNetCore.SignalR; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Internal +{ + public class KafkaTaskScheduler : TaskScheduler, IDisposable + { + private readonly BlockingCollection _tasksCollection=new BlockingCollection (); + private readonly List _workerThreads; + private readonly object _disposeLock = new object(); + private bool _isDisposed; + + /// + /// 当前队列中的任务数 + /// + public int QueuedTasks => _tasksCollection.Count; + + /// + /// 当前工作线程数 + /// + public int WorkerThreads => _workerThreads.Count; + + /// + /// 初始化任务调度器 + /// + public KafkaTaskScheduler() + { + // 默认最大并发线程数为CPU核心数 + int MaxConcurrencyLevel = Environment.ProcessorCount; + _workerThreads = new List(MaxConcurrencyLevel); + for (int i = 0; i < MaxConcurrencyLevel; i++) + { + var thread = new Thread(ExecuteScheduledTasks) + { + IsBackground = true, + Name = $"KafkaWorkerTask-{i + 1}" + }; + thread.Start(); + _workerThreads.Add(thread); + } + + } + + /// + /// 扩容工作线程调度 + /// 可以启动多个工作线程来处理任务 + /// + /// 扩展独立线程数(默认为1) + public void WorkerThreadExpansion(int taskNum = 1) + { + int currCount = WorkerThreads+1; + Parallel.For(0, taskNum, (index) => + { + var thread = new Thread(ExecuteScheduledTasks) + { + IsBackground = true, + Name = $"KafkaWorkerTask-{index+ currCount}" + }; + thread.Start(); + _workerThreads.Add(thread); + }); + } + + /// + /// 工作线程执行循环 + /// + private void ExecuteScheduledTasks() + { + try + { + foreach (var task in _tasksCollection.GetConsumingEnumerable()) + { + TryExecuteTaskSafely(task); + } + } + catch (OperationCanceledException) { } + catch (ObjectDisposedException) { } + } + + /// + /// 安全执行任务并处理异常 + /// + private void TryExecuteTaskSafely(Task task) + { + try + { + TryExecuteTask(task); + } + catch (OperationCanceledException){} + catch (Exception ex) + { + OnExceptionOccurred(ex); + } + } + + #region TaskScheduler 重写方法 + protected override IEnumerable GetScheduledTasks() + { + ThrowIfDisposed(); + return _tasksCollection.ToList(); + } + + protected override void QueueTask(Task task) + { + ThrowIfDisposed(); + _tasksCollection.Add(task); + } + + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // 禁止内联执行以强制所有任务在专用线程执行 + return false; + } + #endregion + + #region 释放资源 + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + lock (_disposeLock) + { + if (_isDisposed) return; + + if (disposing) + { + // 停止接收新任务 + _tasksCollection.CompleteAdding(); + + // 等待所有工作线程退出 + foreach (var thread in _workerThreads) + { + if (thread.IsAlive) + { + thread.Join(TimeSpan.FromSeconds(5)); + } + } + // 释放资源 + _tasksCollection.Dispose(); + } + _isDisposed = true; + } + } + + private void ThrowIfDisposed() + { + if (_isDisposed) + { + throw new ObjectDisposedException(GetType().Name); + } + } + #endregion + + #region 异常事件处理 + /// + /// 任务执行异常时触发 + /// + public event Action? ExceptionEvent; + + private void OnExceptionOccurred(Exception ex) + { + ExceptionEvent?.Invoke(ex); + } + #endregion + } +} diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs index ccbe540..28b719b 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs @@ -33,11 +33,11 @@ namespace JiShe.CollectBus.Kafka var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); - foreach (var item in topics) { kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); } + } /// @@ -69,7 +69,7 @@ namespace JiShe.CollectBus.Kafka // 实现IKafkaSubscribe接口 var subscribeTypes = assembly.GetTypes().Where(type => typeof(IKafkaSubscribe).IsAssignableFrom(type) && - !type.IsAbstract && !type.IsInterface).ToList(); + !type.IsAbstract && !type.IsInterface).ToList(); if (subscribeTypes.Count == 0) continue; @@ -77,7 +77,7 @@ namespace JiShe.CollectBus.Kafka Parallel.ForEach(subscribeTypes, subscribeType => { var subscribes = provider.GetServices(subscribeType).ToList(); - Parallel.ForEach(subscribes,subscribe => + Parallel.ForEach(subscribes, subscribe => { if (subscribe != null) { @@ -102,7 +102,24 @@ namespace JiShe.CollectBus.Kafka //} } logger.LogWarning($"kafka订阅主题:{_topicSubscribeCount}数,共启动:{_threadCount}线程"); + + var kafkaTaskScheduler = provider.GetRequiredService(); + kafkaTaskScheduler.ExceptionEvent += (ex) => + { + logger.LogError(ex, "Kafka任务调度异常"); + }; + // 订阅调度监控测试可打开 + //_ = Task.Factory.StartNew(async () => + // { + // while (true) + // { + // logger.LogWarning($"kafka订阅工作线程数:{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数"); + // await Task.Delay(TimeSpan.FromSeconds(5)); + // } + // }); + }); + } /// @@ -123,20 +140,35 @@ namespace JiShe.CollectBus.Kafka .ToList(); if (subscribeTypes.Count == 0) return; - foreach (var subscribeType in subscribeTypes) + + Parallel.ForEach(subscribeTypes, subscribeType => { var subscribes = provider.GetServices(subscribeType).ToList(); - subscribes.ForEach(subscribe => + Parallel.ForEach(subscribes, subscribe => { - if (subscribe != null) { Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); - threadCount += tuple.Item1; - topicCount += tuple.Item2; + //threadCount += tuple.Item1; + //topicCount += tuple.Item2; } }); - } + }); + + //foreach (var subscribeType in subscribeTypes) + //{ + // var subscribes = provider.GetServices(subscribeType).ToList(); + // subscribes.ForEach(subscribe => + // { + + // if (subscribe != null) + // { + // Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); + // threadCount += tuple.Item1; + // topicCount += tuple.Item2; + // } + // }); + //} logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); }); } @@ -156,13 +188,10 @@ namespace JiShe.CollectBus.Kafka Parallel.ForEach(subscribedMethods, sub => { Interlocked.Increment(ref _topicSubscribeCount); - int partitionCount = sub.Attribute!.TaskCount == -1 ? 3 : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; - var adminClientService = provider.GetRequiredService(); - + var adminClientService = provider.GetRequiredService(); int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); - - //int partitionCount = sub.Attribute!.TaskCount == -1 ? topicCount : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; - + // 可以根据配置文件TaskThreadCount来配置线程数 + int partitionCount = sub.Attribute!.TaskCount == -1 ? (kafkaOptionConfig.TaskThreadCount==-1? topicCount: kafkaOptionConfig.TaskThreadCount) : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; partitionCount = partitionCount > topicCount ? topicCount : partitionCount; //partitionCount = sub.Attribute!.TaskCount == -1 ? adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0) @@ -179,7 +208,7 @@ namespace JiShe.CollectBus.Kafka //foreach (var sub in subscribedMethods) //{ - // //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; + //// //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; // var adminClientService = provider.GetRequiredService(); // int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); @@ -206,11 +235,15 @@ namespace JiShe.CollectBus.Kafka private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); + var kafkaTaskScheduler = provider.GetRequiredService(); if (attr.EnableBatch) { Interlocked.Increment(ref _threadStartCount); logger.LogInformation($"kafka开启线程消费:{_threadStartCount}"); + // 扩展独立线程,避免阻塞 + kafkaTaskScheduler.WorkerThreadExpansion(); + await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { try @@ -238,6 +271,8 @@ namespace JiShe.CollectBus.Kafka { Interlocked.Increment(ref _threadStartCount); logger.LogInformation($"kafka开启线程消费:{_threadStartCount}"); + // 扩展独立线程,避免阻塞 + kafkaTaskScheduler.WorkerThreadExpansion(); await consumerService.SubscribeAsync(attr.Topic, async (message) => { try diff --git a/web/JiShe.CollectBus.Host/appsettings.Production.json b/web/JiShe.CollectBus.Host/appsettings.Production.json index 3dcd888..3fa91ed 100644 --- a/web/JiShe.CollectBus.Host/appsettings.Production.json +++ b/web/JiShe.CollectBus.Host/appsettings.Production.json @@ -18,6 +18,7 @@ "SaslPassword": "lixiao@1980", "KafkaReplicationFactor": 3, "NumPartitions": 30, + "TaskThreadCount": -1, "ServerTagName": "JiSheCollectBus100", "FirstCollectionTime": "2025-04-22 16:07:00" }, diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index b6baf70..6bfcc45 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -15,12 +15,11 @@ "BootstrapServers": "192.168.5.9:29092,192.168.5.9:39092,192.168.5.9:49092", "EnableFilter": true, "EnableAuthorization": false, - "SecurityProtocol": "SaslPlaintext", - "SaslMechanism": "Plain", "SaslUserName": "lixiao", "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, "NumPartitions": 30, + "TaskThreadCount": -1, "FirstCollectionTime": "2025-04-22 16:07:00" }, "IoTDBOptions": {