112 lines
4.0 KiB
C#
112 lines
4.0 KiB
C#
using Confluent.Kafka;
|
|
using Polly.CircuitBreaker;
|
|
using Polly.Retry;
|
|
using Polly;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
using Polly.Contrib.WaitAndRetry;
|
|
using Volo.Abp.DependencyInjection;
|
|
using Microsoft.Extensions.Logging;
|
|
using JiShe.CollectBus.Kafka.Producer;
|
|
|
|
namespace JiShe.CollectBus.Kafka.Internal
|
|
{
|
|
public class KafkaPollyPipeline
|
|
{
|
|
|
|
private readonly ILogger<KafkaPollyPipeline> _logger;
|
|
public KafkaPollyPipeline(ILogger<KafkaPollyPipeline> logger)
|
|
{
|
|
_logger= logger;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 判断是否可恢复的异常
|
|
/// </summary>
|
|
/// <param name="ex"></param>
|
|
/// <returns></returns>
|
|
public static bool IsRecoverableError(Exception ex)
|
|
{
|
|
var errorList= new List<ErrorCode>
|
|
{
|
|
ErrorCode.GroupLoadInProgress,
|
|
ErrorCode.Local_Retry,
|
|
ErrorCode.Local_MaxPollExceeded,
|
|
ErrorCode.RequestTimedOut,
|
|
ErrorCode.LeaderNotAvailable,
|
|
ErrorCode.NotLeaderForPartition,
|
|
ErrorCode.RebalanceInProgress,
|
|
ErrorCode.NotCoordinatorForGroup,
|
|
ErrorCode.NetworkException,
|
|
ErrorCode.GroupCoordinatorNotAvailable
|
|
};
|
|
return ex switch
|
|
{
|
|
ConsumeException kafkaEx => errorList.Contains(kafkaEx.Error.Code),
|
|
KafkaException kafkaEx =>kafkaEx.Error.IsFatal && errorList.Contains(kafkaEx.Error.Code),
|
|
_ => false
|
|
};
|
|
|
|
}
|
|
|
|
/// <summary>
|
|
/// 创建重试 + 断路器
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public ResiliencePipeline KafkaPipeline
|
|
{
|
|
get
|
|
{
|
|
// 组合重试 + 断路器
|
|
ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
|
|
.AddRetry(new RetryStrategyOptions
|
|
{
|
|
ShouldHandle = args => args.Outcome.Exception switch
|
|
{
|
|
not null when IsRecoverableError(args.Outcome.Exception) =>
|
|
PredicateResult.True(),
|
|
_ => PredicateResult.False()
|
|
},
|
|
Delay = TimeSpan.FromSeconds(2),
|
|
OnRetry = args =>
|
|
{
|
|
_logger.LogWarning($"重试中... 第 {args.AttemptNumber} 次,原因: {args.Outcome.Exception?.Message}");
|
|
return default;
|
|
}
|
|
})
|
|
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
|
|
{
|
|
ShouldHandle = args => args.Outcome.Exception switch
|
|
{
|
|
not null when IsRecoverableError(args.Outcome.Exception) =>
|
|
PredicateResult.True(),
|
|
_ => PredicateResult.False()
|
|
},
|
|
FailureRatio = 0.8, // 80% 失败触发熔断
|
|
SamplingDuration = TimeSpan.FromSeconds(10),
|
|
MinimumThroughput = 4, // 至少4次调用才计算失败率
|
|
BreakDuration = TimeSpan.FromSeconds(10),
|
|
OnOpened = args =>
|
|
{
|
|
_logger.LogWarning($"熔断器开启,等待 {args.BreakDuration} 后重试");
|
|
return default;
|
|
},
|
|
OnClosed = _ =>
|
|
{
|
|
_logger.LogWarning("熔断器关闭,再次开始重试");
|
|
return default;
|
|
}
|
|
})
|
|
.Build();
|
|
return pipeline;
|
|
}
|
|
|
|
}
|
|
|
|
|
|
}
|
|
}
|