diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 98e3166..bf752eb 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -183,6 +183,11 @@ namespace JiShe.CollectBus.Kafka.Consumer _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } + catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) + { + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } catch (OperationCanceledException) { //ignore @@ -403,13 +408,18 @@ namespace JiShe.CollectBus.Kafka.Consumer _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } + catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) + { + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } catch (OperationCanceledException) { //ignore } catch (Exception ex) { - _logger.LogError(ex, "处理批量消息时发生未知错误"); + _logger.LogError(ex, "处理批量消息时发生未知错误"); } } }, cts.Token); @@ -540,13 +550,18 @@ namespace JiShe.CollectBus.Kafka.Consumer _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } + catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) + { + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } catch (OperationCanceledException) { //ignore } catch (Exception ex) { - _logger.LogError(ex, "处理批量消息时发生未知错误"); + _logger.LogError(ex, "处理批量消息时发生未知错误"); } } }, cts.Token); diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs index 9750f86..6f1e4fd 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs @@ -33,7 +33,8 @@ namespace JiShe.CollectBus.Kafka.Internal ErrorCode.RebalanceInProgress, ErrorCode.NotCoordinatorForGroup, ErrorCode.NetworkException, - ErrorCode.GroupCoordinatorNotAvailable + ErrorCode.GroupCoordinatorNotAvailable, + ErrorCode.InvalidGroupId }; return ex switch {