using Confluent.Kafka; using Microsoft.Extensions.Logging; using Polly; using Polly.CircuitBreaker; using Polly.Retry; namespace JiShe.CollectBus.Kafka.Internal { public class KafkaPollyPipeline { private readonly ILogger _logger; public KafkaPollyPipeline(ILogger logger) { _logger= logger; } /// /// 判断是否可恢复的异常 /// /// /// public static bool IsRecoverableError(Exception ex) { var errorList= new List { ErrorCode.GroupLoadInProgress, ErrorCode.Local_Retry, ErrorCode.Local_MaxPollExceeded, ErrorCode.RequestTimedOut, ErrorCode.LeaderNotAvailable, ErrorCode.NotLeaderForPartition, ErrorCode.RebalanceInProgress, ErrorCode.NotCoordinatorForGroup, ErrorCode.NetworkException, ErrorCode.GroupCoordinatorNotAvailable, ErrorCode.InvalidGroupId, ErrorCode.IllegalGeneration }; return ex switch { ConsumeException kafkaEx => errorList.Contains(kafkaEx.Error.Code), KafkaException kafkaEx =>kafkaEx.Error.IsFatal && errorList.Contains(kafkaEx.Error.Code), _ => false }; } /// /// 创建重试 + 断路器 /// /// public ResiliencePipeline KafkaPipeline { get { // 组合重试 + 断路器 ResiliencePipeline pipeline = new ResiliencePipelineBuilder() .AddRetry(new RetryStrategyOptions { ShouldHandle = args => args.Outcome.Exception switch { not null when IsRecoverableError(args.Outcome.Exception) => PredicateResult.True(), _ => PredicateResult.False() }, Delay = TimeSpan.FromSeconds(2), OnRetry = args => { _logger.LogWarning($"重试中... 第 {args.AttemptNumber} 次,原因: {args.Outcome.Exception?.Message}"); return default; } }) .AddCircuitBreaker(new CircuitBreakerStrategyOptions { ShouldHandle = args => args.Outcome.Exception switch { not null when IsRecoverableError(args.Outcome.Exception) => PredicateResult.True(), _ => PredicateResult.False() }, FailureRatio = 0.8, // 80% 失败触发熔断 SamplingDuration = TimeSpan.FromSeconds(10), MinimumThroughput = 4, // 至少4次调用才计算失败率 BreakDuration = TimeSpan.FromSeconds(10), OnOpened = args => { _logger.LogWarning($"熔断器开启,等待 {args.BreakDuration} 后重试"); return default; }, OnClosed = _ => { _logger.LogWarning("熔断器关闭,再次开始重试"); return default; } }) .Build(); return pipeline; } } } }