优化kafka消费者被自动踢出消费组增加自动重试订阅

This commit is contained in:
zenghongyao 2025-04-23 13:59:15 +08:00
parent 9cb8688425
commit a340225cda
7 changed files with 399 additions and 239 deletions

View File

@ -63,6 +63,7 @@ var host = Host.CreateDefaultBuilder(args)
services.AddSingleton<IAdminClientService, AdminClientService>();
services.AddSingleton<IProducerService, ProducerService>();
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddSingleton<KafkaPollyPipeline>();
services.AddTransient<KafkaSubscribeTest>();
})

View File

@ -41,6 +41,9 @@ namespace JiShe.CollectBus.Kafka
// 注册Consumer
context.Services.AddSingleton<IConsumerService, ConsumerService>();
// 注册Polly
context.Services.AddSingleton<KafkaPollyPipeline>();
//context.Services.AddHostedService<HostedService>();
}

View File

@ -2,12 +2,14 @@
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.AspNetCore.DataProtection.KeyManagement;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
namespace JiShe.CollectBus.Kafka.Consumer
{
@ -20,17 +22,26 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// </summary>
private readonly ConcurrentDictionary<string, (object Consumer, CancellationTokenSource CTS)>
_consumerStore = new();
/// <summary>
/// 消费完或者无数据时的延迟时间
/// </summary>
private TimeSpan DelayTime => TimeSpan.FromMilliseconds(100);
private readonly KafkaOptionConfig _kafkaOptionConfig;
private readonly KafkaPollyPipeline _kafkaPollyPipeline;
/// <summary>
/// ConsumerService
/// </summary>
/// <param name="logger"></param>
/// <param name="kafkaOptionConfig"></param>
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline)
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
_kafkaPollyPipeline = kafkaPollyPipeline;
}
#region private
@ -99,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
{
await SubscribeAsync<TValue>(new[] { topic }, messageHandler,groupId);
await SubscribeAsync<TValue>(new[] { topic }, messageHandler, groupId);
}
/// <summary>
@ -112,58 +123,75 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
await Task.Run(async () =>
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
while (!cts.IsCancellationRequested)
{
try
{
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null)
continue;
if (result.IsPartitionEOF)
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
var result = consumer.Consume(cts.Token);
if (result == null || result.Message == null || result.Message.Value == null)
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (result.IsPartitionEOF)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
}
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
consumer.Commit(result); // 手动提交
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
}
}
}, cts.Token);
await Task.CompletedTask;
});
await Task.CompletedTask;
}
@ -178,7 +206,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{
try {
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
@ -201,14 +230,16 @@ namespace JiShe.CollectBus.Kafka.Consumer
var result = consumer.Consume(cts.Token);
if (result == null || result.Message == null || result.Message.Value == null)
{
await Task.Delay(500, cts.Token);
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (result.IsPartitionEOF)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(100, cts.Token);
#endif
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
@ -217,7 +248,6 @@ namespace JiShe.CollectBus.Kafka.Consumer
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
await Task.Delay(500, cts.Token);
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
@ -226,22 +256,26 @@ namespace JiShe.CollectBus.Kafka.Consumer
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
else
consumer.StoreOffset(result);
//else
// consumer.StoreOffset(result);
}
catch (ConsumeException ex)
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
});
} catch (Exception ex)
{
_logger.LogWarning($"Kafka消费异常: {ex.Message}");
}
await Task.CompletedTask;
}, cts.Token);
await Task.CompletedTask;
});
}
@ -270,109 +304,114 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
if (result != null)
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
if (result.IsPartitionEOF)
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(10, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
if (result.IsPartitionEOF)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value);
}
}
else
{
// 无消息时短暂等待
await Task.Delay(10, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
}
catch (OperationCanceledException)
{
// 任务取消,正常退出
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
await Task.CompletedTask;
});
}
@ -403,110 +442,113 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
public async Task SubscribeBatchAsync<TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
//var messages = new List<ConsumeResult<TKey, TValue>>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
cts
)).Consumer as IConsumer<Ignore, TValue>;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
if (result != null)
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
if (result.IsPartitionEOF)
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(10, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
if (result.IsPartitionEOF)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(DelayTime, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value);
}
}
else
{
// 无消息时短暂等待
await Task.Delay(10, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
catch (OperationCanceledException)
{
// 任务取消,正常退出
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
await Task.CompletedTask;
});
}
@ -515,9 +557,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public void Unsubscribe<TKey, TValue>(string[] topics, string groupId) where TKey : notnull where TValue : class
public void Unsubscribe<TKey, TValue>(string[] topics, string? groupId) where TKey : notnull where TValue : class
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
if (_consumerStore.TryRemove(consumerKey, out var entry))
{
entry.CTS.Cancel();

View File

@ -0,0 +1,111 @@
using Confluent.Kafka;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Polly.Contrib.WaitAndRetry;
using Volo.Abp.DependencyInjection;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Kafka.Producer;
namespace JiShe.CollectBus.Kafka.Internal
{
public class KafkaPollyPipeline
{
private readonly ILogger<KafkaPollyPipeline> _logger;
public KafkaPollyPipeline(ILogger<KafkaPollyPipeline> logger)
{
_logger= logger;
}
/// <summary>
/// 判断是否可恢复的异常
/// </summary>
/// <param name="ex"></param>
/// <returns></returns>
public static bool IsRecoverableError(Exception ex)
{
var errorList= new List<ErrorCode>
{
ErrorCode.GroupLoadInProgress,
ErrorCode.Local_Retry,
ErrorCode.Local_MaxPollExceeded,
ErrorCode.RequestTimedOut,
ErrorCode.LeaderNotAvailable,
ErrorCode.NotLeaderForPartition,
ErrorCode.RebalanceInProgress,
ErrorCode.NotCoordinatorForGroup,
ErrorCode.NetworkException,
ErrorCode.GroupCoordinatorNotAvailable
};
return ex switch
{
ConsumeException kafkaEx => errorList.Contains(kafkaEx.Error.Code),
KafkaException kafkaEx =>kafkaEx.Error.IsFatal && errorList.Contains(kafkaEx.Error.Code),
_ => false
};
}
/// <summary>
/// 创建重试 + 断路器
/// </summary>
/// <returns></returns>
public ResiliencePipeline KafkaPipeline
{
get
{
// 组合重试 + 断路器
ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
ShouldHandle = args => args.Outcome.Exception switch
{
not null when IsRecoverableError(args.Outcome.Exception) =>
PredicateResult.True(),
_ => PredicateResult.False()
},
Delay = TimeSpan.FromSeconds(2),
OnRetry = args =>
{
_logger.LogWarning($"重试中... 第 {args.AttemptNumber} 次,原因: {args.Outcome.Exception?.Message}");
return default;
}
})
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
ShouldHandle = args => args.Outcome.Exception switch
{
not null when IsRecoverableError(args.Outcome.Exception) =>
PredicateResult.True(),
_ => PredicateResult.False()
},
FailureRatio = 0.8, // 80% 失败触发熔断
SamplingDuration = TimeSpan.FromSeconds(10),
MinimumThroughput = 4, // 至少4次调用才计算失败率
BreakDuration = TimeSpan.FromSeconds(10),
OnOpened = args =>
{
_logger.LogWarning($"熔断器开启,等待 {args.BreakDuration} 后重试");
return default;
},
OnClosed = _ =>
{
_logger.LogWarning("熔断器关闭,再次开始重试");
return default;
}
})
.Build();
return pipeline;
}
}
}
}

View File

@ -8,6 +8,8 @@
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
<PackageReference Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageReference Include="Polly.Core" Version="8.5.2" />
<PackageReference Include="Volo.Abp.AspNetCore" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
</ItemGroup>

View File

@ -50,13 +50,13 @@ namespace JiShe.CollectBus.Protocol
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 实现IAnalysisStrategy接口
var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)));
if (analysisStrategyTypes.Count() == 0)
if (!analysisStrategyTypes.Any())
continue;
foreach (var analysisStrategyType in analysisStrategyTypes)
{
// 通过反射获取静态元数据
var strategyType = analysisStrategyType.Name;
var genericArgs = analysisStrategyType.GetInterface("IAnalysisStrategy`2")!.GetGenericArguments();
var genericArgs = analysisStrategyType.GetInterface($"IAnalysisStrategy`2")!.GetGenericArguments();
var inputType = genericArgs[0];
var resultType = genericArgs[1];
// 注册策略实现

View File

@ -16,6 +16,7 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
<title>后端服务</title>
</head>
<body>