diff --git a/modules/JiShe.CollectBus.Kafka.Test/Program.cs b/modules/JiShe.CollectBus.Kafka.Test/Program.cs index 016d0c4..3c99810 100644 --- a/modules/JiShe.CollectBus.Kafka.Test/Program.cs +++ b/modules/JiShe.CollectBus.Kafka.Test/Program.cs @@ -63,6 +63,7 @@ var host = Host.CreateDefaultBuilder(args) services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddTransient(); }) diff --git a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs index 5621b3d..d31b9ed 100644 --- a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs +++ b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs @@ -41,6 +41,9 @@ namespace JiShe.CollectBus.Kafka // 注册Consumer context.Services.AddSingleton(); + // 注册Polly + context.Services.AddSingleton(); + //context.Services.AddHostedService(); } diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 32df748..d4a62db 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -2,12 +2,14 @@ using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; +using Microsoft.AspNetCore.DataProtection.KeyManagement; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Text; using System.Text.RegularExpressions; +using System.Threading; namespace JiShe.CollectBus.Kafka.Consumer { @@ -20,17 +22,26 @@ namespace JiShe.CollectBus.Kafka.Consumer /// private readonly ConcurrentDictionary _consumerStore = new(); + + /// + /// 消费完或者无数据时的延迟时间 + /// + private TimeSpan DelayTime => TimeSpan.FromMilliseconds(100); + private readonly KafkaOptionConfig _kafkaOptionConfig; + private readonly KafkaPollyPipeline _kafkaPollyPipeline; + /// /// ConsumerService /// /// /// - public ConsumerService(ILogger logger, IOptions kafkaOptionConfig) + public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline) { _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; + _kafkaPollyPipeline = kafkaPollyPipeline; } #region private 私有方法 @@ -99,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class { - await SubscribeAsync(new[] { topic }, messageHandler,groupId); + await SubscribeAsync(new[] { topic }, messageHandler, groupId); } /// @@ -112,58 +123,75 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - consumer!.Subscribe(topics); - - await Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - while (!cts.IsCancellationRequested) - { - try - { - //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); - var result = consumer.Consume(cts.Token); - if (result == null || result.Message==null || result.Message.Value == null) - continue; - - if (result.IsPartitionEOF) + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + _ = Task.Run(async () => + { + while (!cts.IsCancellationRequested) + { + try { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(TimeSpan.FromSeconds(1),cts.Token); - continue; - } - if (_kafkaOptionConfig.EnableFilter) - { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); + + var result = consumer.Consume(cts.Token); + if (result == null || result.Message == null || result.Message.Value == null) { - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 + await Task.Delay(DelayTime, cts.Token); continue; } + if (result.IsPartitionEOF) + { +#if DEBUG + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); +#endif + await Task.Delay(DelayTime, cts.Token); + continue; + } + if (_kafkaOptionConfig.EnableFilter) + { + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + bool sucess = await messageHandler(result.Message.Key, result.Message.Value); + if (sucess) + consumer.Commit(result); // 手动提交 } - bool sucess= await messageHandler(result.Message.Key, result.Message.Value); - if (sucess) + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { - consumer.Commit(result); // 手动提交 + _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理消息时发生未知错误"); } } - catch (ConsumeException ex) - { - _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); - } - } + }, cts.Token); + await Task.CompletedTask; }); - await Task.CompletedTask; + } @@ -178,7 +206,8 @@ namespace JiShe.CollectBus.Kafka.Consumer /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { - try { + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => + { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => @@ -201,14 +230,16 @@ namespace JiShe.CollectBus.Kafka.Consumer var result = consumer.Consume(cts.Token); if (result == null || result.Message == null || result.Message.Value == null) { - await Task.Delay(500, cts.Token); + await Task.Delay(DelayTime, cts.Token); continue; } if (result.IsPartitionEOF) { +#if DEBUG _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(100, cts.Token); +#endif + await Task.Delay(DelayTime, cts.Token); continue; } if (_kafkaOptionConfig.EnableFilter) @@ -217,7 +248,6 @@ namespace JiShe.CollectBus.Kafka.Consumer // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { - await Task.Delay(500, cts.Token); //consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; @@ -226,22 +256,26 @@ namespace JiShe.CollectBus.Kafka.Consumer bool sucess = await messageHandler(result.Message.Value); if (sucess) consumer.Commit(result); // 手动提交 - else - consumer.StoreOffset(result); + //else + // consumer.StoreOffset(result); } - catch (ConsumeException ex) + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 + } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理消息时发生未知错误"); } } - }); - } catch (Exception ex) - { - _logger.LogWarning($"Kafka消费异常: {ex.Message}"); - - } - - await Task.CompletedTask; + }, cts.Token); + await Task.CompletedTask; + }); } @@ -270,109 +304,114 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费组ID /// 批次大小 /// 批次超时时间 - public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class + public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - consumer!.Subscribe(topics); - - var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 - - _ = Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); - var startTime = DateTime.UtcNow; - while (!cts.IsCancellationRequested) + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + consumer!.Subscribe(topics); + + var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 + + _ = Task.Run(async () => { - try - { - // 非阻塞快速累积消息 - while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) - { - var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); + var startTime = DateTime.UtcNow; - if (result != null) + while (!cts.IsCancellationRequested) + { + try + { + // 非阻塞快速累积消息 + while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { - if (result.IsPartitionEOF) + var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + + if (result != null) { - //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(10, cts.Token); - } - else if (result.Message.Value != null) - { - if (_kafkaOptionConfig.EnableFilter) + if (result.IsPartitionEOF) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) +#if DEBUG + _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); +#endif + await Task.Delay(DelayTime, cts.Token); + } + else if (result.Message.Value != null) + { + if (_kafkaOptionConfig.EnableFilter) { - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + messages.Add((result.Message.Value, result.TopicPartitionOffset)); + } + } + else + { + // 无消息时短暂等待 + await Task.Delay(DelayTime, cts.Token); + } + } + + // 处理批次 + if (messages.Count > 0) + { + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); + if (success) + { + var offsetsByPartition = new Dictionary(); + foreach (var msg in messages) + { + var tp = msg.Offset.TopicPartition; + var offset = msg.Offset.Offset; + if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) + { + offsetsByPartition[tp] = offset; } } - messages.Add((result.Message.Value, result.TopicPartitionOffset)); - //messages.Add(result.Message.Value); - } - } - else - { - // 无消息时短暂等待 - await Task.Delay(10, cts.Token); - } - } - // 处理批次 - if (messages.Count > 0) + var offsetsToCommit = offsetsByPartition + .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) + .ToList(); + consumer.Commit(offsetsToCommit); + } + messages.Clear(); + } + + startTime = DateTime.UtcNow; + } + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { - bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); - if (success) - { - var offsetsByPartition = new Dictionary(); - foreach (var msg in messages) - { - var tp = msg.Offset.TopicPartition; - var offset = msg.Offset.Offset; - if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) - { - offsetsByPartition[tp] = offset; - } - } - - var offsetsToCommit = offsetsByPartition - .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) - .ToList(); - consumer.Commit(offsetsToCommit); - } - messages.Clear(); + _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理批量消息时发生未知错误"); + } + } + }, cts.Token); - startTime = DateTime.UtcNow; - } - catch (ConsumeException ex) - { - _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); - } - catch (OperationCanceledException) - { - // 任务取消,正常退出 - - } - catch (Exception ex) - { - _logger.LogError(ex, "处理批量消息时发生未知错误"); - } - } - }, cts.Token); - - await Task.CompletedTask; + await Task.CompletedTask; + }); } @@ -403,110 +442,113 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 批次大小 /// 批次超时时间 /// 消费等待时间 - public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class + public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; - var cts = new CancellationTokenSource(); - - var consumer = _consumerStore.GetOrAdd(consumerKey, _ => - ( - CreateConsumer(groupId), - cts - )).Consumer as IConsumer; - - consumer!.Subscribe(topics); - - var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 - - _ = Task.Run(async () => + await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { - var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); - //var messages = new List>(); - var startTime = DateTime.UtcNow; - while (!cts.IsCancellationRequested) + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 + + _ = Task.Run(async () => { - try - { - // 非阻塞快速累积消息 - while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) - { - var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); + var startTime = DateTime.UtcNow; - if (result != null) + while (!cts.IsCancellationRequested) + { + try + { + // 非阻塞快速累积消息 + while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { - if (result.IsPartitionEOF) + var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 + + if (result != null) { - //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); - await Task.Delay(10, cts.Token); - } - else if (result.Message.Value != null) - { - if (_kafkaOptionConfig.EnableFilter) + if (result.IsPartitionEOF) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); + await Task.Delay(DelayTime, cts.Token); + } + else if (result.Message.Value != null) + { + if (_kafkaOptionConfig.EnableFilter) { - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } + } + messages.Add((result.Message.Value, result.TopicPartitionOffset)); + } + } + else + { + // 无消息时短暂等待 + await Task.Delay(DelayTime, cts.Token); + } + } + + // 处理批次 + if (messages.Count > 0) + { + bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); + if (success) + { + var offsetsByPartition = new Dictionary(); + foreach (var msg in messages) + { + var tp = msg.Offset.TopicPartition; + var offset = msg.Offset.Offset; + if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) + { + offsetsByPartition[tp] = offset; } } - messages.Add((result.Message.Value, result.TopicPartitionOffset)); - //messages.Add(result.Message.Value); - } - } - else - { - // 无消息时短暂等待 - await Task.Delay(10, cts.Token); - } - } - // 处理批次 - if (messages.Count > 0) + var offsetsToCommit = offsetsByPartition + .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) + .ToList(); + consumer.Commit(offsetsToCommit); + } + messages.Clear(); + } + + startTime = DateTime.UtcNow; + } + catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { - bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); - if (success) - { - var offsetsByPartition = new Dictionary(); - foreach (var msg in messages) - { - var tp = msg.Offset.TopicPartition; - var offset = msg.Offset.Offset; - if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) - { - offsetsByPartition[tp] = offset; - } - } - - var offsetsToCommit = offsetsByPartition - .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) - .ToList(); - consumer.Commit(offsetsToCommit); - } - messages.Clear(); + _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); + throw; // 抛出异常,以便重试 } + catch (OperationCanceledException) + { + //ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "处理批量消息时发生未知错误"); + } + } + }, cts.Token); - startTime = DateTime.UtcNow; - } - catch (ConsumeException ex) - { - _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); - } - catch (OperationCanceledException) - { - // 任务取消,正常退出 - } - catch (Exception ex) - { - _logger.LogError(ex, "处理批量消息时发生未知错误"); - } - } - }, cts.Token); - - await Task.CompletedTask; + await Task.CompletedTask; + }); } @@ -515,9 +557,9 @@ namespace JiShe.CollectBus.Kafka.Consumer /// /// /// - public void Unsubscribe(string[] topics, string groupId) where TKey : notnull where TValue : class + public void Unsubscribe(string[] topics, string? groupId) where TKey : notnull where TValue : class { - var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; + var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; if (_consumerStore.TryRemove(consumerKey, out var entry)) { entry.CTS.Cancel(); diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs new file mode 100644 index 0000000..c467921 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs @@ -0,0 +1,111 @@ +using Confluent.Kafka; +using Polly.CircuitBreaker; +using Polly.Retry; +using Polly; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Polly.Contrib.WaitAndRetry; +using Volo.Abp.DependencyInjection; +using Microsoft.Extensions.Logging; +using JiShe.CollectBus.Kafka.Producer; + +namespace JiShe.CollectBus.Kafka.Internal +{ + public class KafkaPollyPipeline + { + + private readonly ILogger _logger; + public KafkaPollyPipeline(ILogger logger) + { + _logger= logger; + } + + /// + /// 判断是否可恢复的异常 + /// + /// + /// + public static bool IsRecoverableError(Exception ex) + { + var errorList= new List + { + ErrorCode.GroupLoadInProgress, + ErrorCode.Local_Retry, + ErrorCode.Local_MaxPollExceeded, + ErrorCode.RequestTimedOut, + ErrorCode.LeaderNotAvailable, + ErrorCode.NotLeaderForPartition, + ErrorCode.RebalanceInProgress, + ErrorCode.NotCoordinatorForGroup, + ErrorCode.NetworkException, + ErrorCode.GroupCoordinatorNotAvailable + }; + return ex switch + { + ConsumeException kafkaEx => errorList.Contains(kafkaEx.Error.Code), + KafkaException kafkaEx =>kafkaEx.Error.IsFatal && errorList.Contains(kafkaEx.Error.Code), + _ => false + }; + + } + + /// + /// 创建重试 + 断路器 + /// + /// + public ResiliencePipeline KafkaPipeline + { + get + { + // 组合重试 + 断路器 + ResiliencePipeline pipeline = new ResiliencePipelineBuilder() + .AddRetry(new RetryStrategyOptions + { + ShouldHandle = args => args.Outcome.Exception switch + { + not null when IsRecoverableError(args.Outcome.Exception) => + PredicateResult.True(), + _ => PredicateResult.False() + }, + Delay = TimeSpan.FromSeconds(2), + OnRetry = args => + { + _logger.LogWarning($"重试中... 第 {args.AttemptNumber} 次,原因: {args.Outcome.Exception?.Message}"); + return default; + } + }) + .AddCircuitBreaker(new CircuitBreakerStrategyOptions + { + ShouldHandle = args => args.Outcome.Exception switch + { + not null when IsRecoverableError(args.Outcome.Exception) => + PredicateResult.True(), + _ => PredicateResult.False() + }, + FailureRatio = 0.8, // 80% 失败触发熔断 + SamplingDuration = TimeSpan.FromSeconds(10), + MinimumThroughput = 4, // 至少4次调用才计算失败率 + BreakDuration = TimeSpan.FromSeconds(10), + OnOpened = args => + { + _logger.LogWarning($"熔断器开启,等待 {args.BreakDuration} 后重试"); + return default; + }, + OnClosed = _ => + { + _logger.LogWarning("熔断器关闭,再次开始重试"); + return default; + } + }) + .Build(); + return pipeline; + } + + } + + + } +} diff --git a/modules/JiShe.CollectBus.Kafka/JiShe.CollectBus.Kafka.csproj b/modules/JiShe.CollectBus.Kafka/JiShe.CollectBus.Kafka.csproj index ce31120..a0fd2f7 100644 --- a/modules/JiShe.CollectBus.Kafka/JiShe.CollectBus.Kafka.csproj +++ b/modules/JiShe.CollectBus.Kafka/JiShe.CollectBus.Kafka.csproj @@ -8,6 +8,8 @@ + + diff --git a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs index 8019d34..842ad39 100644 --- a/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs +++ b/protocols/JiShe.CollectBus.Protocol/JiSheCollectBusProtocolModule.cs @@ -50,13 +50,13 @@ namespace JiShe.CollectBus.Protocol var assembly = existingAssembly ?? Assembly.LoadFrom(file); // 实现IAnalysisStrategy接口 var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>))); - if (analysisStrategyTypes.Count() == 0) + if (!analysisStrategyTypes.Any()) continue; foreach (var analysisStrategyType in analysisStrategyTypes) { // 通过反射获取静态元数据 var strategyType = analysisStrategyType.Name; - var genericArgs = analysisStrategyType.GetInterface("IAnalysisStrategy`2")!.GetGenericArguments(); + var genericArgs = analysisStrategyType.GetInterface($"IAnalysisStrategy`2")!.GetGenericArguments(); var inputType = genericArgs[0]; var resultType = genericArgs[1]; // 注册策略实现 diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index 88209c2..0193df3 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,6 +16,7 @@ 后端服务 +