kafka优化异常重试

This commit is contained in:
zenghongyao 2025-04-30 15:14:23 +08:00
parent a97421a00e
commit a70cf4b7ce

View File

@ -183,6 +183,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
@ -403,13 +408,18 @@ namespace JiShe.CollectBus.Kafka.Consumer
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
@ -540,13 +550,18 @@ namespace JiShe.CollectBus.Kafka.Consumer
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);