diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 73ea14c..e830dc4 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -138,21 +138,42 @@ namespace JiShe.CollectBus.Kafka private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); - - await consumerService.SubscribeAsync(attr.Topic, async (message) => + + if (attr.EnableBatch) { - try + await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { - // 处理消息 - return await ProcessMessageAsync(message, method, subscribe); - } - catch (ConsumeException ex) + try + { + // 处理消息 + return await ProcessMessageAsync(message, method, subscribe); + } + catch (ConsumeException ex) + { + // 处理消费错误 + logger.LogError($"kafka批量消费异常:{ex.Message}"); + } + return await Task.FromResult(false); + }); + } + else + { + await consumerService.SubscribeAsync(attr.Topic, async (message) => { - // 处理消费错误 - logger.LogError($"kafka消费异常:{ex.Message}"); - } - return await Task.FromResult(false); - }); + try + { + // 处理消息 + return await ProcessMessageAsync(message, method, subscribe); + } + catch (ConsumeException ex) + { + // 处理消费错误 + logger.LogError($"kafka消费异常:{ex.Message}"); + } + return await Task.FromResult(false); + }); + } + }