kafka消费者订阅增加批量消费

This commit is contained in:
zenghongyao 2025-04-17 13:56:17 +08:00
parent 4c5f7231bf
commit 72f8222ec2

View File

@ -139,20 +139,41 @@ namespace JiShe.CollectBus.Kafka
{ {
var consumerService = provider.GetRequiredService<IConsumerService>(); var consumerService = provider.GetRequiredService<IConsumerService>();
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) => if (attr.EnableBatch)
{ {
try await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
{ {
// 处理消息 try
return await ProcessMessageAsync(message, method, subscribe); {
} // 处理消息
catch (ConsumeException ex) return await ProcessMessageAsync(message, method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
return await Task.FromResult(false);
});
}
else
{
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
{ {
// 处理消费错误 try
logger.LogError($"kafka消费异常:{ex.Message}"); {
} // 处理消息
return await Task.FromResult(false); return await ProcessMessageAsync(message, method, subscribe);
}); }
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}");
}
return await Task.FromResult(false);
});
}
} }