From 72f8222ec27e7c1ff91834a97d93fca5523c5785 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Thu, 17 Apr 2025 13:56:17 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E6=B6=88=E8=B4=B9=E8=80=85=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E5=A2=9E=E5=8A=A0=E6=89=B9=E9=87=8F=E6=B6=88=E8=B4=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../KafkaSubcribesExtensions.cs | 45 ++++++++++++++----- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 73ea14c..e830dc4 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -138,21 +138,42 @@ namespace JiShe.CollectBus.Kafka private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); - - await consumerService.SubscribeAsync(attr.Topic, async (message) => + + if (attr.EnableBatch) { - try + await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { - // 处理消息 - return await ProcessMessageAsync(message, method, subscribe); - } - catch (ConsumeException ex) + try + { + // 处理消息 + return await ProcessMessageAsync(message, method, subscribe); + } + catch (ConsumeException ex) + { + // 处理消费错误 + logger.LogError($"kafka批量消费异常:{ex.Message}"); + } + return await Task.FromResult(false); + }); + } + else + { + await consumerService.SubscribeAsync(attr.Topic, async (message) => { - // 处理消费错误 - logger.LogError($"kafka消费异常:{ex.Message}"); - } - return await Task.FromResult(false); - }); + try + { + // 处理消息 + return await ProcessMessageAsync(message, method, subscribe); + } + catch (ConsumeException ex) + { + // 处理消费错误 + logger.LogError($"kafka消费异常:{ex.Message}"); + } + return await Task.FromResult(false); + }); + } + }