优化kafka订阅消费一个分区一个线程,解决影响主线程问题

This commit is contained in:
zenghongyao 2025-05-23 10:07:47 +08:00
parent 7dd833257a
commit 2d2245fa85
7 changed files with 260 additions and 37 deletions

View File

@ -1,11 +1,13 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System.Reflection; using System.Reflection;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
@ -22,20 +24,12 @@ namespace JiShe.CollectBus.Kafka
//var kafkaSection = configuration.GetSection(CommonConst.Kafka); //var kafkaSection = configuration.GetSection(CommonConst.Kafka);
//KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig(); //KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig();
//kafkaSection.Bind(kafkaOptionConfig); //kafkaSection.Bind(kafkaOptionConfig);
//if (configuration[CommonConst.ServerTagName] != null) //Configure<KafkaOptionConfig>(kafkaSection);
//{
// kafkaOptionConfig.ServerTagName = configuration[CommonConst.ServerTagName]!;
//}
//context.Services.AddSingleton(kafkaOptionConfig);
//context.Services.Configure<KafkaOptionConfig>(context.Services.GetConfiguration().GetSection(CommonConst.Kafka));
Configure<KafkaOptionConfig>(options => Configure<KafkaOptionConfig>(options =>
{ {
configuration.GetSection(CommonConst.Kafka).Bind(options); configuration.GetSection(CommonConst.Kafka).Bind(options);
}); });
// 注册Producer // 注册Producer
context.Services.AddSingleton<IProducerService, ProducerService>(); context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer // 注册Consumer
@ -44,6 +38,12 @@ namespace JiShe.CollectBus.Kafka
// 注册Polly // 注册Polly
context.Services.AddSingleton<KafkaPollyPipeline>(); context.Services.AddSingleton<KafkaPollyPipeline>();
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
// 注册任务调度
context.Services.AddSingleton<KafkaTaskScheduler>();
//context.Services.AddHostedService<HostedService>(); //context.Services.AddHostedService<HostedService>();
} }

View File

@ -37,17 +37,21 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly KafkaPollyPipeline _kafkaPollyPipeline; private readonly KafkaPollyPipeline _kafkaPollyPipeline;
private readonly KafkaTaskScheduler _kafkaTaskScheduler;
/// <summary> /// <summary>
/// ConsumerService /// ConsumerService
/// </summary> /// </summary>
/// <param name="logger"></param> /// <param name="logger"></param>
/// <param name="kafkaOptionConfig"></param> /// <param name="kafkaOptionConfig"></param>
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions<ServerApplicationOptions> applicationOptions) public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions<ServerApplicationOptions> applicationOptions, KafkaTaskScheduler kafkaTaskScheduler)
{ {
_logger = logger; _logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value; _kafkaOptionConfig = kafkaOptionConfig.Value;
_applicationOptions = applicationOptions.Value; _applicationOptions = applicationOptions.Value;
_kafkaPollyPipeline = kafkaPollyPipeline; _kafkaPollyPipeline = kafkaPollyPipeline;
_kafkaTaskScheduler = kafkaTaskScheduler;
} }
#region private #region private
@ -148,7 +152,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
var cts = consumerStore.CTS; var cts = consumerStore.CTS;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
_ = Task.Run(async () => _= Task.Factory.StartNew(async () =>
{ {
while (!cts.IsCancellationRequested) while (!cts.IsCancellationRequested)
{ {
@ -209,7 +213,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
_logger.LogError(ex, "处理消息时发生未知错误"); _logger.LogError(ex, "处理消息时发生未知错误");
} }
} }
}, cts.Token); }, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask; await Task.CompletedTask;
}); });
@ -254,7 +258,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
_ = Task.Run(async () => _ = Task.Factory.StartNew(async () =>
{ {
int count = 0; int count = 0;
while (!cts.IsCancellationRequested) while (!cts.IsCancellationRequested)
@ -314,7 +318,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
_logger.LogError(ex, "处理消息时发生未知错误"); _logger.LogError(ex, "处理消息时发生未知错误");
} }
} }
}, cts.Token); }, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask; await Task.CompletedTask;
}); });
} }
@ -385,7 +389,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () => _ = Task.Factory.StartNew(async () =>
{ {
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow; var startTime = DateTime.UtcNow;
@ -482,7 +486,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
_logger.LogError(ex, "处理批量消息时发生未知错误"); _logger.LogError(ex, "处理批量消息时发生未知错误");
} }
} }
}, cts.Token); }, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask; await Task.CompletedTask;
}); });
@ -555,7 +559,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () => _ = Task.Factory.StartNew(async () =>
{ {
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow; var startTime = DateTime.UtcNow;
@ -650,7 +654,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
_logger.LogError(ex, "处理批量消息时发生未知错误"); _logger.LogError(ex, "处理批量消息时发生未知错误");
} }
} }
}, cts.Token); }, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask; await Task.CompletedTask;
}); });

View File

@ -49,4 +49,11 @@ public class KafkaOptionConfig
/// </summary> /// </summary>
public string? SaslPassword { get; set; } public string? SaslPassword { get; set; }
/// <summary>
/// 订阅任务线程数量
/// 当主题未指定时,订阅任务线程数量默认为:-1
/// 优先级低于订阅任务特性TaskCount值
/// </summary>
public int TaskThreadCount { get; set; } = -1;
} }

View File

@ -0,0 +1,177 @@
using Confluent.Kafka;
using Microsoft.AspNetCore.SignalR;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Internal
{
public class KafkaTaskScheduler : TaskScheduler, IDisposable
{
private readonly BlockingCollection<Task> _tasksCollection=new BlockingCollection<Task> ();
private readonly List<Thread> _workerThreads;
private readonly object _disposeLock = new object();
private bool _isDisposed;
/// <summary>
/// 当前队列中的任务数
/// </summary>
public int QueuedTasks => _tasksCollection.Count;
/// <summary>
/// 当前工作线程数
/// </summary>
public int WorkerThreads => _workerThreads.Count;
/// <summary>
/// 初始化任务调度器
/// </summary>
public KafkaTaskScheduler()
{
// 默认最大并发线程数为CPU核心数
int MaxConcurrencyLevel = Environment.ProcessorCount;
_workerThreads = new List<Thread>(MaxConcurrencyLevel);
for (int i = 0; i < MaxConcurrencyLevel; i++)
{
var thread = new Thread(ExecuteScheduledTasks)
{
IsBackground = true,
Name = $"KafkaWorkerTask-{i + 1}"
};
thread.Start();
_workerThreads.Add(thread);
}
}
/// <summary>
/// 扩容工作线程调度
/// 可以启动多个工作线程来处理任务
/// </summary>
/// <param name="taskNum">扩展独立线程数(默认为1)</param>
public void WorkerThreadExpansion(int taskNum = 1)
{
int currCount = WorkerThreads+1;
Parallel.For(0, taskNum, (index) =>
{
var thread = new Thread(ExecuteScheduledTasks)
{
IsBackground = true,
Name = $"KafkaWorkerTask-{index+ currCount}"
};
thread.Start();
_workerThreads.Add(thread);
});
}
/// <summary>
/// 工作线程执行循环
/// </summary>
private void ExecuteScheduledTasks()
{
try
{
foreach (var task in _tasksCollection.GetConsumingEnumerable())
{
TryExecuteTaskSafely(task);
}
}
catch (OperationCanceledException) { }
catch (ObjectDisposedException) { }
}
/// <summary>
/// 安全执行任务并处理异常
/// </summary>
private void TryExecuteTaskSafely(Task task)
{
try
{
TryExecuteTask(task);
}
catch (OperationCanceledException){}
catch (Exception ex)
{
OnExceptionOccurred(ex);
}
}
#region TaskScheduler
protected override IEnumerable<Task> GetScheduledTasks()
{
ThrowIfDisposed();
return _tasksCollection.ToList();
}
protected override void QueueTask(Task task)
{
ThrowIfDisposed();
_tasksCollection.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// 禁止内联执行以强制所有任务在专用线程执行
return false;
}
#endregion
#region
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
lock (_disposeLock)
{
if (_isDisposed) return;
if (disposing)
{
// 停止接收新任务
_tasksCollection.CompleteAdding();
// 等待所有工作线程退出
foreach (var thread in _workerThreads)
{
if (thread.IsAlive)
{
thread.Join(TimeSpan.FromSeconds(5));
}
}
// 释放资源
_tasksCollection.Dispose();
}
_isDisposed = true;
}
}
private void ThrowIfDisposed()
{
if (_isDisposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}
#endregion
#region
/// <summary>
/// 任务执行异常时触发
/// </summary>
public event Action<Exception>? ExceptionEvent;
private void OnExceptionOccurred(Exception ex)
{
ExceptionEvent?.Invoke(ex);
}
#endregion
}
}

View File

@ -33,11 +33,11 @@ namespace JiShe.CollectBus.Kafka
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics) foreach (var item in topics)
{ {
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
} }
} }
/// <summary> /// <summary>
@ -102,7 +102,24 @@ namespace JiShe.CollectBus.Kafka
//} //}
} }
logger.LogWarning($"kafka订阅主题:{_topicSubscribeCount}数,共启动:{_threadCount}线程"); logger.LogWarning($"kafka订阅主题:{_topicSubscribeCount}数,共启动:{_threadCount}线程");
var kafkaTaskScheduler = provider.GetRequiredService<KafkaTaskScheduler>();
kafkaTaskScheduler.ExceptionEvent += (ex) =>
{
logger.LogError(ex, "Kafka任务调度异常");
};
// 订阅调度监控测试可打开
//_ = Task.Factory.StartNew(async () =>
// {
// while (true)
// {
// logger.LogWarning($"kafka订阅工作线程数{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数");
// await Task.Delay(TimeSpan.FromSeconds(5));
// }
// });
}); });
} }
/// <summary> /// <summary>
@ -123,20 +140,35 @@ namespace JiShe.CollectBus.Kafka
.ToList(); .ToList();
if (subscribeTypes.Count == 0) return; if (subscribeTypes.Count == 0) return;
foreach (var subscribeType in subscribeTypes)
Parallel.ForEach(subscribeTypes, subscribeType =>
{ {
var subscribes = provider.GetServices(subscribeType).ToList(); var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => Parallel.ForEach(subscribes, subscribe =>
{ {
if (subscribe != null) if (subscribe != null)
{ {
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1; //threadCount += tuple.Item1;
topicCount += tuple.Item2; //topicCount += tuple.Item2;
} }
}); });
} });
//foreach (var subscribeType in subscribeTypes)
//{
// var subscribes = provider.GetServices(subscribeType).ToList();
// subscribes.ForEach(subscribe =>
// {
// if (subscribe != null)
// {
// Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
// threadCount += tuple.Item1;
// topicCount += tuple.Item2;
// }
// });
//}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
}); });
} }
@ -156,13 +188,10 @@ namespace JiShe.CollectBus.Kafka
Parallel.ForEach(subscribedMethods, sub => Parallel.ForEach(subscribedMethods, sub =>
{ {
Interlocked.Increment(ref _topicSubscribeCount); Interlocked.Increment(ref _topicSubscribeCount);
int partitionCount = sub.Attribute!.TaskCount == -1 ? 3 : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
var adminClientService = provider.GetRequiredService<IAdminClientService>(); var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
// 可以根据配置文件TaskThreadCount来配置线程数
//int partitionCount = sub.Attribute!.TaskCount == -1 ? topicCount : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; int partitionCount = sub.Attribute!.TaskCount == -1 ? (kafkaOptionConfig.TaskThreadCount==-1? topicCount: kafkaOptionConfig.TaskThreadCount) : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
partitionCount = partitionCount > topicCount ? topicCount : partitionCount; partitionCount = partitionCount > topicCount ? topicCount : partitionCount;
//partitionCount = sub.Attribute!.TaskCount == -1 ? adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; //partitionCount = sub.Attribute!.TaskCount == -1 ? adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0) if (partitionCount <= 0)
@ -179,7 +208,7 @@ namespace JiShe.CollectBus.Kafka
//foreach (var sub in subscribedMethods) //foreach (var sub in subscribedMethods)
//{ //{
// //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; //// //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
// var adminClientService = provider.GetRequiredService<IAdminClientService>(); // var adminClientService = provider.GetRequiredService<IAdminClientService>();
// int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); // int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
@ -206,11 +235,15 @@ namespace JiShe.CollectBus.Kafka
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger) private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger)
{ {
var consumerService = provider.GetRequiredService<IConsumerService>(); var consumerService = provider.GetRequiredService<IConsumerService>();
var kafkaTaskScheduler = provider.GetRequiredService<KafkaTaskScheduler>();
if (attr.EnableBatch) if (attr.EnableBatch)
{ {
Interlocked.Increment(ref _threadStartCount); Interlocked.Increment(ref _threadStartCount);
logger.LogInformation($"kafka开启线程消费:{_threadStartCount}"); logger.LogInformation($"kafka开启线程消费:{_threadStartCount}");
// 扩展独立线程,避免阻塞
kafkaTaskScheduler.WorkerThreadExpansion();
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) => await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
{ {
try try
@ -238,6 +271,8 @@ namespace JiShe.CollectBus.Kafka
{ {
Interlocked.Increment(ref _threadStartCount); Interlocked.Increment(ref _threadStartCount);
logger.LogInformation($"kafka开启线程消费:{_threadStartCount}"); logger.LogInformation($"kafka开启线程消费:{_threadStartCount}");
// 扩展独立线程,避免阻塞
kafkaTaskScheduler.WorkerThreadExpansion();
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) => await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
{ {
try try

View File

@ -18,6 +18,7 @@
"SaslPassword": "lixiao@1980", "SaslPassword": "lixiao@1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"TaskThreadCount": -1,
"ServerTagName": "JiSheCollectBus100", "ServerTagName": "JiSheCollectBus100",
"FirstCollectionTime": "2025-04-22 16:07:00" "FirstCollectionTime": "2025-04-22 16:07:00"
}, },

View File

@ -15,12 +15,11 @@
"BootstrapServers": "192.168.5.9:29092,192.168.5.9:39092,192.168.5.9:49092", "BootstrapServers": "192.168.5.9:29092,192.168.5.9:39092,192.168.5.9:49092",
"EnableFilter": true, "EnableFilter": true,
"EnableAuthorization": false, "EnableAuthorization": false,
"SecurityProtocol": "SaslPlaintext",
"SaslMechanism": "Plain",
"SaslUserName": "lixiao", "SaslUserName": "lixiao",
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"TaskThreadCount": -1,
"FirstCollectionTime": "2025-04-22 16:07:00" "FirstCollectionTime": "2025-04-22 16:07:00"
}, },
"IoTDBOptions": { "IoTDBOptions": {