Compare commits

..

No commits in common. "0584515df9faa9668ba7c97776f286bb187cf558" and "902993e4816ecdc420583ce9351d771d2be9b881" have entirely different histories.

View File

@ -138,42 +138,21 @@ namespace JiShe.CollectBus.Kafka
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger)
{
var consumerService = provider.GetRequiredService<IConsumerService>();
if (attr.EnableBatch)
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
{
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
try
{
try
{
// 处理消息
return await ProcessMessageAsync(message, method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
return await Task.FromResult(false);
});
}
else
{
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
// 处理消息
return await ProcessMessageAsync(message, method, subscribe);
}
catch (ConsumeException ex)
{
try
{
// 处理消息
return await ProcessMessageAsync(message, method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}");
}
return await Task.FromResult(false);
});
}
// 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}");
}
return await Task.FromResult(false);
});
}