Compare commits
2 Commits
42a446c24d
...
9c24f53eb6
| Author | SHA1 | Date | |
|---|---|---|---|
| 9c24f53eb6 | |||
| a340225cda |
@ -63,6 +63,7 @@ var host = Host.CreateDefaultBuilder(args)
|
|||||||
services.AddSingleton<IAdminClientService, AdminClientService>();
|
services.AddSingleton<IAdminClientService, AdminClientService>();
|
||||||
services.AddSingleton<IProducerService, ProducerService>();
|
services.AddSingleton<IProducerService, ProducerService>();
|
||||||
services.AddSingleton<IConsumerService, ConsumerService>();
|
services.AddSingleton<IConsumerService, ConsumerService>();
|
||||||
|
services.AddSingleton<KafkaPollyPipeline>();
|
||||||
services.AddTransient<KafkaSubscribeTest>();
|
services.AddTransient<KafkaSubscribeTest>();
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|||||||
@ -41,6 +41,9 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
// 注册Consumer
|
// 注册Consumer
|
||||||
context.Services.AddSingleton<IConsumerService, ConsumerService>();
|
context.Services.AddSingleton<IConsumerService, ConsumerService>();
|
||||||
|
|
||||||
|
// 注册Polly
|
||||||
|
context.Services.AddSingleton<KafkaPollyPipeline>();
|
||||||
|
|
||||||
//context.Services.AddHostedService<HostedService>();
|
//context.Services.AddHostedService<HostedService>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2,12 +2,14 @@
|
|||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Kafka.Internal;
|
using JiShe.CollectBus.Kafka.Internal;
|
||||||
using JiShe.CollectBus.Kafka.Serialization;
|
using JiShe.CollectBus.Kafka.Serialization;
|
||||||
|
using Microsoft.AspNetCore.DataProtection.KeyManagement;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Text.RegularExpressions;
|
using System.Text.RegularExpressions;
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
namespace JiShe.CollectBus.Kafka.Consumer
|
||||||
{
|
{
|
||||||
@ -20,17 +22,26 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
private readonly ConcurrentDictionary<string, (object Consumer, CancellationTokenSource CTS)>
|
private readonly ConcurrentDictionary<string, (object Consumer, CancellationTokenSource CTS)>
|
||||||
_consumerStore = new();
|
_consumerStore = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 消费完或者无数据时的延迟时间
|
||||||
|
/// </summary>
|
||||||
|
private TimeSpan DelayTime => TimeSpan.FromMilliseconds(100);
|
||||||
|
|
||||||
private readonly KafkaOptionConfig _kafkaOptionConfig;
|
private readonly KafkaOptionConfig _kafkaOptionConfig;
|
||||||
|
|
||||||
|
private readonly KafkaPollyPipeline _kafkaPollyPipeline;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// ConsumerService
|
/// ConsumerService
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="logger"></param>
|
/// <param name="logger"></param>
|
||||||
/// <param name="kafkaOptionConfig"></param>
|
/// <param name="kafkaOptionConfig"></param>
|
||||||
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
|
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_kafkaOptionConfig = kafkaOptionConfig.Value;
|
_kafkaOptionConfig = kafkaOptionConfig.Value;
|
||||||
|
_kafkaPollyPipeline = kafkaPollyPipeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
#region private 私有方法
|
#region private 私有方法
|
||||||
@ -99,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
|
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
|
||||||
{
|
{
|
||||||
await SubscribeAsync<TValue>(new[] { topic }, messageHandler,groupId);
|
await SubscribeAsync<TValue>(new[] { topic }, messageHandler, groupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -112,58 +123,75 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
|
||||||
(
|
|
||||||
CreateConsumer<TKey, TValue>(groupId),
|
|
||||||
cts
|
|
||||||
)).Consumer as IConsumer<TKey, TValue>;
|
|
||||||
consumer!.Subscribe(topics);
|
|
||||||
|
|
||||||
await Task.Run(async () =>
|
|
||||||
{
|
{
|
||||||
while (!cts.IsCancellationRequested)
|
|
||||||
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
|
(
|
||||||
|
CreateConsumer<TKey, TValue>(groupId),
|
||||||
|
cts
|
||||||
|
)).Consumer as IConsumer<TKey, TValue>;
|
||||||
|
|
||||||
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
|
_ = Task.Run(async () =>
|
||||||
{
|
{
|
||||||
try
|
while (!cts.IsCancellationRequested)
|
||||||
{
|
{
|
||||||
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
|
try
|
||||||
|
|
||||||
var result = consumer.Consume(cts.Token);
|
|
||||||
if (result == null || result.Message==null || result.Message.Value == null)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
if (result.IsPartitionEOF)
|
|
||||||
{
|
{
|
||||||
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
|
||||||
await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
|
|
||||||
continue;
|
var result = consumer.Consume(cts.Token);
|
||||||
}
|
if (result == null || result.Message == null || result.Message.Value == null)
|
||||||
if (_kafkaOptionConfig.EnableFilter)
|
|
||||||
{
|
|
||||||
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
|
|
||||||
// 检查 Header 是否符合条件
|
|
||||||
if (!headersFilter.Match(result.Message.Headers))
|
|
||||||
{
|
{
|
||||||
//consumer.Commit(result); // 提交偏移量
|
await Task.Delay(DelayTime, cts.Token);
|
||||||
// 跳过消息
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (result.IsPartitionEOF)
|
||||||
|
{
|
||||||
|
#if DEBUG
|
||||||
|
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
||||||
|
#endif
|
||||||
|
await Task.Delay(DelayTime, cts.Token);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (_kafkaOptionConfig.EnableFilter)
|
||||||
|
{
|
||||||
|
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
|
||||||
|
// 检查 Header 是否符合条件
|
||||||
|
if (!headersFilter.Match(result.Message.Headers))
|
||||||
|
{
|
||||||
|
//consumer.Commit(result); // 提交偏移量
|
||||||
|
// 跳过消息
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bool sucess = await messageHandler(result.Message.Key, result.Message.Value);
|
||||||
|
if (sucess)
|
||||||
|
consumer.Commit(result); // 手动提交
|
||||||
}
|
}
|
||||||
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
|
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
|
||||||
if (sucess)
|
|
||||||
{
|
{
|
||||||
consumer.Commit(result); // 手动提交
|
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
||||||
|
throw; // 抛出异常,以便重试
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
//ignore
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "处理消息时发生未知错误");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
}, cts.Token);
|
||||||
{
|
await Task.CompletedTask;
|
||||||
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
await Task.CompletedTask;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -178,7 +206,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
||||||
{
|
{
|
||||||
try {
|
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
|
||||||
|
{
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
@ -201,14 +230,16 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
var result = consumer.Consume(cts.Token);
|
var result = consumer.Consume(cts.Token);
|
||||||
if (result == null || result.Message == null || result.Message.Value == null)
|
if (result == null || result.Message == null || result.Message.Value == null)
|
||||||
{
|
{
|
||||||
await Task.Delay(500, cts.Token);
|
await Task.Delay(DelayTime, cts.Token);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.IsPartitionEOF)
|
if (result.IsPartitionEOF)
|
||||||
{
|
{
|
||||||
|
#if DEBUG
|
||||||
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
||||||
await Task.Delay(100, cts.Token);
|
#endif
|
||||||
|
await Task.Delay(DelayTime, cts.Token);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (_kafkaOptionConfig.EnableFilter)
|
if (_kafkaOptionConfig.EnableFilter)
|
||||||
@ -217,7 +248,6 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
// 检查 Header 是否符合条件
|
// 检查 Header 是否符合条件
|
||||||
if (!headersFilter.Match(result.Message.Headers))
|
if (!headersFilter.Match(result.Message.Headers))
|
||||||
{
|
{
|
||||||
await Task.Delay(500, cts.Token);
|
|
||||||
//consumer.Commit(result); // 提交偏移量
|
//consumer.Commit(result); // 提交偏移量
|
||||||
// 跳过消息
|
// 跳过消息
|
||||||
continue;
|
continue;
|
||||||
@ -226,22 +256,26 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
bool sucess = await messageHandler(result.Message.Value);
|
bool sucess = await messageHandler(result.Message.Value);
|
||||||
if (sucess)
|
if (sucess)
|
||||||
consumer.Commit(result); // 手动提交
|
consumer.Commit(result); // 手动提交
|
||||||
else
|
//else
|
||||||
consumer.StoreOffset(result);
|
// consumer.StoreOffset(result);
|
||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
||||||
|
throw; // 抛出异常,以便重试
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
//ignore
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "处理消息时发生未知错误");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}, cts.Token);
|
||||||
} catch (Exception ex)
|
await Task.CompletedTask;
|
||||||
{
|
});
|
||||||
_logger.LogWarning($"Kafka消费异常: {ex.Message}");
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
await Task.CompletedTask;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -270,109 +304,114 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <param name="groupId">消费组ID</param>
|
/// <param name="groupId">消费组ID</param>
|
||||||
/// <param name="batchSize">批次大小</param>
|
/// <param name="batchSize">批次大小</param>
|
||||||
/// <param name="batchTimeout">批次超时时间</param>
|
/// <param name="batchTimeout">批次超时时间</param>
|
||||||
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
|
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
|
||||||
(
|
|
||||||
CreateConsumer<TKey, TValue>(groupId),
|
|
||||||
cts
|
|
||||||
)).Consumer as IConsumer<TKey, TValue>;
|
|
||||||
consumer!.Subscribe(topics);
|
|
||||||
|
|
||||||
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
|
||||||
{
|
{
|
||||||
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
|
|
||||||
var startTime = DateTime.UtcNow;
|
|
||||||
|
|
||||||
while (!cts.IsCancellationRequested)
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
|
(
|
||||||
|
CreateConsumer<TKey, TValue>(groupId),
|
||||||
|
cts
|
||||||
|
)).Consumer as IConsumer<TKey, TValue>;
|
||||||
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
|
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
||||||
|
|
||||||
|
_ = Task.Run(async () =>
|
||||||
{
|
{
|
||||||
try
|
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
|
||||||
{
|
var startTime = DateTime.UtcNow;
|
||||||
// 非阻塞快速累积消息
|
|
||||||
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
|
|
||||||
{
|
|
||||||
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
|
|
||||||
|
|
||||||
if (result != null)
|
while (!cts.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// 非阻塞快速累积消息
|
||||||
|
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
|
||||||
{
|
{
|
||||||
if (result.IsPartitionEOF)
|
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
|
||||||
|
|
||||||
|
if (result != null)
|
||||||
{
|
{
|
||||||
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
if (result.IsPartitionEOF)
|
||||||
await Task.Delay(10, cts.Token);
|
|
||||||
}
|
|
||||||
else if (result.Message.Value != null)
|
|
||||||
{
|
|
||||||
if (_kafkaOptionConfig.EnableFilter)
|
|
||||||
{
|
{
|
||||||
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
|
#if DEBUG
|
||||||
// 检查 Header 是否符合条件
|
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
||||||
if (!headersFilter.Match(result.Message.Headers))
|
#endif
|
||||||
|
await Task.Delay(DelayTime, cts.Token);
|
||||||
|
}
|
||||||
|
else if (result.Message.Value != null)
|
||||||
|
{
|
||||||
|
if (_kafkaOptionConfig.EnableFilter)
|
||||||
{
|
{
|
||||||
//consumer.Commit(result); // 提交偏移量
|
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
|
||||||
// 跳过消息
|
// 检查 Header 是否符合条件
|
||||||
continue;
|
if (!headersFilter.Match(result.Message.Headers))
|
||||||
|
{
|
||||||
|
//consumer.Commit(result); // 提交偏移量
|
||||||
|
// 跳过消息
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
messages.Add((result.Message.Value, result.TopicPartitionOffset));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// 无消息时短暂等待
|
||||||
|
await Task.Delay(DelayTime, cts.Token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理批次
|
||||||
|
if (messages.Count > 0)
|
||||||
|
{
|
||||||
|
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
|
||||||
|
if (success)
|
||||||
|
{
|
||||||
|
var offsetsByPartition = new Dictionary<TopicPartition, long>();
|
||||||
|
foreach (var msg in messages)
|
||||||
|
{
|
||||||
|
var tp = msg.Offset.TopicPartition;
|
||||||
|
var offset = msg.Offset.Offset;
|
||||||
|
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
|
||||||
|
{
|
||||||
|
offsetsByPartition[tp] = offset;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
messages.Add((result.Message.Value, result.TopicPartitionOffset));
|
|
||||||
//messages.Add(result.Message.Value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// 无消息时短暂等待
|
|
||||||
await Task.Delay(10, cts.Token);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理批次
|
var offsetsToCommit = offsetsByPartition
|
||||||
if (messages.Count > 0)
|
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
|
||||||
|
.ToList();
|
||||||
|
consumer.Commit(offsetsToCommit);
|
||||||
|
}
|
||||||
|
messages.Clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
startTime = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
|
||||||
{
|
{
|
||||||
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
|
_logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
|
||||||
if (success)
|
throw; // 抛出异常,以便重试
|
||||||
{
|
|
||||||
var offsetsByPartition = new Dictionary<TopicPartition, long>();
|
|
||||||
foreach (var msg in messages)
|
|
||||||
{
|
|
||||||
var tp = msg.Offset.TopicPartition;
|
|
||||||
var offset = msg.Offset.Offset;
|
|
||||||
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
|
|
||||||
{
|
|
||||||
offsetsByPartition[tp] = offset;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var offsetsToCommit = offsetsByPartition
|
|
||||||
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
|
|
||||||
.ToList();
|
|
||||||
consumer.Commit(offsetsToCommit);
|
|
||||||
}
|
|
||||||
messages.Clear();
|
|
||||||
}
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
//ignore
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, cts.Token);
|
||||||
|
|
||||||
startTime = DateTime.UtcNow;
|
await Task.CompletedTask;
|
||||||
}
|
});
|
||||||
catch (ConsumeException ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
|
|
||||||
}
|
|
||||||
catch (OperationCanceledException)
|
|
||||||
{
|
|
||||||
// 任务取消,正常退出
|
|
||||||
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, cts.Token);
|
|
||||||
|
|
||||||
await Task.CompletedTask;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -403,110 +442,113 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <param name="batchSize">批次大小</param>
|
/// <param name="batchSize">批次大小</param>
|
||||||
/// <param name="batchTimeout">批次超时时间</param>
|
/// <param name="batchTimeout">批次超时时间</param>
|
||||||
/// <param name="consumeTimeout">消费等待时间</param>
|
/// <param name="consumeTimeout">消费等待时间</param>
|
||||||
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
|
public async Task SubscribeBatchAsync<TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
|
||||||
(
|
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
|
||||||
cts
|
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
|
||||||
|
|
||||||
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
|
||||||
{
|
{
|
||||||
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
|
|
||||||
//var messages = new List<ConsumeResult<TKey, TValue>>();
|
|
||||||
var startTime = DateTime.UtcNow;
|
|
||||||
|
|
||||||
while (!cts.IsCancellationRequested)
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
|
(
|
||||||
|
CreateConsumer<Ignore, TValue>(groupId),
|
||||||
|
cts
|
||||||
|
)).Consumer as IConsumer<Ignore, TValue>;
|
||||||
|
|
||||||
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
|
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
|
||||||
|
|
||||||
|
_ = Task.Run(async () =>
|
||||||
{
|
{
|
||||||
try
|
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
|
||||||
{
|
var startTime = DateTime.UtcNow;
|
||||||
// 非阻塞快速累积消息
|
|
||||||
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
|
|
||||||
{
|
|
||||||
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
|
|
||||||
|
|
||||||
if (result != null)
|
while (!cts.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// 非阻塞快速累积消息
|
||||||
|
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
|
||||||
{
|
{
|
||||||
if (result.IsPartitionEOF)
|
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
|
||||||
|
|
||||||
|
if (result != null)
|
||||||
{
|
{
|
||||||
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
if (result.IsPartitionEOF)
|
||||||
await Task.Delay(10, cts.Token);
|
|
||||||
}
|
|
||||||
else if (result.Message.Value != null)
|
|
||||||
{
|
|
||||||
if (_kafkaOptionConfig.EnableFilter)
|
|
||||||
{
|
{
|
||||||
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
|
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
||||||
// 检查 Header 是否符合条件
|
await Task.Delay(DelayTime, cts.Token);
|
||||||
if (!headersFilter.Match(result.Message.Headers))
|
}
|
||||||
|
else if (result.Message.Value != null)
|
||||||
|
{
|
||||||
|
if (_kafkaOptionConfig.EnableFilter)
|
||||||
{
|
{
|
||||||
//consumer.Commit(result); // 提交偏移量
|
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
|
||||||
// 跳过消息
|
// 检查 Header 是否符合条件
|
||||||
continue;
|
if (!headersFilter.Match(result.Message.Headers))
|
||||||
|
{
|
||||||
|
//consumer.Commit(result); // 提交偏移量
|
||||||
|
// 跳过消息
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
messages.Add((result.Message.Value, result.TopicPartitionOffset));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// 无消息时短暂等待
|
||||||
|
await Task.Delay(DelayTime, cts.Token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理批次
|
||||||
|
if (messages.Count > 0)
|
||||||
|
{
|
||||||
|
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
|
||||||
|
if (success)
|
||||||
|
{
|
||||||
|
var offsetsByPartition = new Dictionary<TopicPartition, long>();
|
||||||
|
foreach (var msg in messages)
|
||||||
|
{
|
||||||
|
var tp = msg.Offset.TopicPartition;
|
||||||
|
var offset = msg.Offset.Offset;
|
||||||
|
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
|
||||||
|
{
|
||||||
|
offsetsByPartition[tp] = offset;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
messages.Add((result.Message.Value, result.TopicPartitionOffset));
|
|
||||||
//messages.Add(result.Message.Value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// 无消息时短暂等待
|
|
||||||
await Task.Delay(10, cts.Token);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理批次
|
var offsetsToCommit = offsetsByPartition
|
||||||
if (messages.Count > 0)
|
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
|
||||||
|
.ToList();
|
||||||
|
consumer.Commit(offsetsToCommit);
|
||||||
|
}
|
||||||
|
messages.Clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
startTime = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
|
||||||
{
|
{
|
||||||
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
|
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
||||||
if (success)
|
throw; // 抛出异常,以便重试
|
||||||
{
|
|
||||||
var offsetsByPartition = new Dictionary<TopicPartition, long>();
|
|
||||||
foreach (var msg in messages)
|
|
||||||
{
|
|
||||||
var tp = msg.Offset.TopicPartition;
|
|
||||||
var offset = msg.Offset.Offset;
|
|
||||||
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
|
|
||||||
{
|
|
||||||
offsetsByPartition[tp] = offset;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var offsetsToCommit = offsetsByPartition
|
|
||||||
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
|
|
||||||
.ToList();
|
|
||||||
consumer.Commit(offsetsToCommit);
|
|
||||||
}
|
|
||||||
messages.Clear();
|
|
||||||
}
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
//ignore
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, cts.Token);
|
||||||
|
|
||||||
startTime = DateTime.UtcNow;
|
await Task.CompletedTask;
|
||||||
}
|
});
|
||||||
catch (ConsumeException ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
|
||||||
}
|
|
||||||
catch (OperationCanceledException)
|
|
||||||
{
|
|
||||||
// 任务取消,正常退出
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "处理批量消息时发生未知错误");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, cts.Token);
|
|
||||||
|
|
||||||
await Task.CompletedTask;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -515,9 +557,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TKey"></typeparam>
|
/// <typeparam name="TKey"></typeparam>
|
||||||
/// <typeparam name="TValue"></typeparam>
|
/// <typeparam name="TValue"></typeparam>
|
||||||
public void Unsubscribe<TKey, TValue>(string[] topics, string groupId) where TKey : notnull where TValue : class
|
public void Unsubscribe<TKey, TValue>(string[] topics, string? groupId) where TKey : notnull where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
|
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
|
||||||
if (_consumerStore.TryRemove(consumerKey, out var entry))
|
if (_consumerStore.TryRemove(consumerKey, out var entry))
|
||||||
{
|
{
|
||||||
entry.CTS.Cancel();
|
entry.CTS.Cancel();
|
||||||
|
|||||||
111
modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs
Normal file
111
modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
using Confluent.Kafka;
|
||||||
|
using Polly.CircuitBreaker;
|
||||||
|
using Polly.Retry;
|
||||||
|
using Polly;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Polly.Contrib.WaitAndRetry;
|
||||||
|
using Volo.Abp.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using JiShe.CollectBus.Kafka.Producer;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.Kafka.Internal
|
||||||
|
{
|
||||||
|
public class KafkaPollyPipeline
|
||||||
|
{
|
||||||
|
|
||||||
|
private readonly ILogger<KafkaPollyPipeline> _logger;
|
||||||
|
public KafkaPollyPipeline(ILogger<KafkaPollyPipeline> logger)
|
||||||
|
{
|
||||||
|
_logger= logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断是否可恢复的异常
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="ex"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static bool IsRecoverableError(Exception ex)
|
||||||
|
{
|
||||||
|
var errorList= new List<ErrorCode>
|
||||||
|
{
|
||||||
|
ErrorCode.GroupLoadInProgress,
|
||||||
|
ErrorCode.Local_Retry,
|
||||||
|
ErrorCode.Local_MaxPollExceeded,
|
||||||
|
ErrorCode.RequestTimedOut,
|
||||||
|
ErrorCode.LeaderNotAvailable,
|
||||||
|
ErrorCode.NotLeaderForPartition,
|
||||||
|
ErrorCode.RebalanceInProgress,
|
||||||
|
ErrorCode.NotCoordinatorForGroup,
|
||||||
|
ErrorCode.NetworkException,
|
||||||
|
ErrorCode.GroupCoordinatorNotAvailable
|
||||||
|
};
|
||||||
|
return ex switch
|
||||||
|
{
|
||||||
|
ConsumeException kafkaEx => errorList.Contains(kafkaEx.Error.Code),
|
||||||
|
KafkaException kafkaEx =>kafkaEx.Error.IsFatal && errorList.Contains(kafkaEx.Error.Code),
|
||||||
|
_ => false
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 创建重试 + 断路器
|
||||||
|
/// </summary>
|
||||||
|
/// <returns></returns>
|
||||||
|
public ResiliencePipeline KafkaPipeline
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
// 组合重试 + 断路器
|
||||||
|
ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
|
||||||
|
.AddRetry(new RetryStrategyOptions
|
||||||
|
{
|
||||||
|
ShouldHandle = args => args.Outcome.Exception switch
|
||||||
|
{
|
||||||
|
not null when IsRecoverableError(args.Outcome.Exception) =>
|
||||||
|
PredicateResult.True(),
|
||||||
|
_ => PredicateResult.False()
|
||||||
|
},
|
||||||
|
Delay = TimeSpan.FromSeconds(2),
|
||||||
|
OnRetry = args =>
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"重试中... 第 {args.AttemptNumber} 次,原因: {args.Outcome.Exception?.Message}");
|
||||||
|
return default;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
|
||||||
|
{
|
||||||
|
ShouldHandle = args => args.Outcome.Exception switch
|
||||||
|
{
|
||||||
|
not null when IsRecoverableError(args.Outcome.Exception) =>
|
||||||
|
PredicateResult.True(),
|
||||||
|
_ => PredicateResult.False()
|
||||||
|
},
|
||||||
|
FailureRatio = 0.8, // 80% 失败触发熔断
|
||||||
|
SamplingDuration = TimeSpan.FromSeconds(10),
|
||||||
|
MinimumThroughput = 4, // 至少4次调用才计算失败率
|
||||||
|
BreakDuration = TimeSpan.FromSeconds(10),
|
||||||
|
OnOpened = args =>
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"熔断器开启,等待 {args.BreakDuration} 后重试");
|
||||||
|
return default;
|
||||||
|
},
|
||||||
|
OnClosed = _ =>
|
||||||
|
{
|
||||||
|
_logger.LogWarning("熔断器关闭,再次开始重试");
|
||||||
|
return default;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.Build();
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -8,6 +8,8 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
|
<PackageReference Include="Confluent.Kafka" Version="2.9.0" />
|
||||||
|
<PackageReference Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
|
||||||
|
<PackageReference Include="Polly.Core" Version="8.5.2" />
|
||||||
<PackageReference Include="Volo.Abp.AspNetCore" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.AspNetCore" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|||||||
@ -50,13 +50,13 @@ namespace JiShe.CollectBus.Protocol
|
|||||||
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
|
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
|
||||||
// 实现IAnalysisStrategy接口
|
// 实现IAnalysisStrategy接口
|
||||||
var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)));
|
var analysisStrategyTypes = assembly.GetTypes().Where(t => !t.IsAbstract && !t.IsInterface && t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAnalysisStrategy<,>)));
|
||||||
if (analysisStrategyTypes.Count() == 0)
|
if (!analysisStrategyTypes.Any())
|
||||||
continue;
|
continue;
|
||||||
foreach (var analysisStrategyType in analysisStrategyTypes)
|
foreach (var analysisStrategyType in analysisStrategyTypes)
|
||||||
{
|
{
|
||||||
// 通过反射获取静态元数据
|
// 通过反射获取静态元数据
|
||||||
var strategyType = analysisStrategyType.Name;
|
var strategyType = analysisStrategyType.Name;
|
||||||
var genericArgs = analysisStrategyType.GetInterface("IAnalysisStrategy`2")!.GetGenericArguments();
|
var genericArgs = analysisStrategyType.GetInterface($"IAnalysisStrategy`2")!.GetGenericArguments();
|
||||||
var inputType = genericArgs[0];
|
var inputType = genericArgs[0];
|
||||||
var resultType = genericArgs[1];
|
var resultType = genericArgs[1];
|
||||||
// 注册策略实现
|
// 注册策略实现
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
|
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
|
||||||
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet"/>
|
||||||
<title>后端服务</title>
|
<title>后端服务</title>
|
||||||
|
|
||||||
</head>
|
</head>
|
||||||
|
|
||||||
<body>
|
<body>
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user