Compare commits

..

2 Commits

View File

@ -139,6 +139,25 @@ namespace JiShe.CollectBus.Kafka
{ {
var consumerService = provider.GetRequiredService<IConsumerService>(); var consumerService = provider.GetRequiredService<IConsumerService>();
if (attr.EnableBatch)
{
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
{
try
{
// 处理消息
return await ProcessMessageAsync(message, method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
return await Task.FromResult(false);
});
}
else
{
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) => await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
{ {
try try
@ -155,6 +174,8 @@ namespace JiShe.CollectBus.Kafka
}); });
} }
}
/// <summary> /// <summary>
/// 处理消息 /// 处理消息