2025-05-23 14:01:33 +08:00

719 lines
34 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Confluent.Kafka;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.AspNetCore.DataProtection.KeyManagement;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using YamlDotNet.Core.Tokens;
namespace JiShe.CollectBus.Kafka.Consumer
{
public class ConsumerService : IConsumerService, IDisposable
{
private readonly ILogger<ConsumerService> _logger;
/// <summary>
/// 消费者存储
/// Key 格式:{groupId}_{topic}_{TKey}_{TValue}
/// </summary>
private readonly ConcurrentDictionary<string, (object Consumer, CancellationTokenSource CTS)>
_consumerStore = new();
/// <summary>
/// 消费完或者无数据时的延迟时间
/// </summary>
private static TimeSpan DelayTime => TimeSpan.FromMilliseconds(100);
private readonly KafkaOptionConfig _kafkaOptionConfig;
private readonly ServerApplicationOptions _applicationOptions;
private readonly KafkaPollyPipeline _kafkaPollyPipeline;
private readonly KafkaTaskScheduler _kafkaTaskScheduler;
/// <summary>
/// ConsumerService
/// </summary>
/// <param name="logger"></param>
/// <param name="kafkaOptionConfig"></param>
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions<ServerApplicationOptions> applicationOptions, KafkaTaskScheduler kafkaTaskScheduler)
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
_applicationOptions = applicationOptions.Value;
_kafkaPollyPipeline = kafkaPollyPipeline;
_kafkaTaskScheduler = kafkaTaskScheduler;
}
#region private
/// <summary>
/// 创建消费者
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
private IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(string? groupId = null) where TKey : notnull where TValue : class
{
var config = BuildConsumerConfig(groupId);
return new ConsumerBuilder<TKey, TValue>(config)
.SetValueDeserializer(new JsonSerializer<TValue>())
.SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}"))
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
.Build();
}
private ConsumerConfig BuildConsumerConfig(string? groupId = null)
{
var config = new ConsumerConfig
{
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
GroupId = groupId ?? _applicationOptions.ServerTagName,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记
//AllowAutoCreateTopics = true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
};
if (_kafkaOptionConfig.EnableAuthorization)
{
config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol;
config.SaslMechanism = _kafkaOptionConfig.SaslMechanism;
config.SaslUsername = _kafkaOptionConfig.SaslUserName;
config.SaslPassword = _kafkaOptionConfig.SaslPassword;
}
return config;
}
#endregion
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string topic, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
await SubscribeAsync<TKey, TValue>(new[] { topic }, messageHandler, groupId);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
{
await SubscribeAsync<TValue>(new[] { topic }, messageHandler, groupId);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics);
_= Task.Factory.StartNew(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
var result = consumer.Consume(cts.Token);
if (result == null || result.Message == null || result.Message.Value == null)
{
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (result.IsPartitionEOF)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask;
});
}
catch (Exception ex)
{
throw;
}
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <param name="groupId"></param>
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics);
_ = Task.Factory.StartNew(async () =>
{
int count = 0;
while (!cts.IsCancellationRequested)
{
try
{
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
count++;
var result = consumer.Consume(cts.Token);
if (result == null || result.Message == null || result.Message.Value == null)
{
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (result.IsPartitionEOF)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
//else
// consumer.StoreOffset(result);
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息时发生未知错误");
}
}
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask;
});
}
catch (Exception ex)
{
throw;
}
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TKey">消息Key类型</typeparam>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topic">主题</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
try
{
await SubscribeBatchAsync<TKey, TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout);
}
catch (Exception ex)
{
throw;
}
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TKey">消息Key类型</typeparam>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topics">主题列表</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<TKey, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Factory.StartNew(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
if (result.IsPartitionEOF)
{
#if DEBUG
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
#endif
await Task.Delay(DelayTime, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask;
});
}
catch (Exception ex)
{
throw;
}
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topic">主题列表</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{
try
{
await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
}
catch (Exception ex)
{
throw;
}
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topics">主题列表</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{
try
{
// 扩展独立线程,避免阻塞
_kafkaTaskScheduler.WorkerThreadExpansion();
await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token =>
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}";
var consumerStore = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<Ignore, TValue>(groupId),
new CancellationTokenSource()
));
if (consumerStore.Consumer == null)
{
_logger.LogWarning($"{string.Join("", topics)}创建消息消费失败或消费组已被释放");
return;
}
var consumer = consumerStore.Consumer as IConsumer<Ignore, TValue>;
var cts = consumerStore.CTS;
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Factory.StartNew(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
if (result.IsPartitionEOF)
{
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(DelayTime, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
}
}
else
{
// 无消息时短暂等待
await Task.Delay(DelayTime, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex))
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
throw; // 抛出异常,以便重试
}
catch (OperationCanceledException)
{
//ignore
}
catch (ObjectDisposedException)
{
_logger.LogError($"{string.Join("", topics)}消费者被释放");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token, TaskCreationOptions.None, _kafkaTaskScheduler);
await Task.CompletedTask;
});
}
catch (Exception ex)
{
throw;
}
}
/// <summary>
/// 取消消息订阅
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public void Unsubscribe<TKey, TValue>(string[] topics, string? groupId) where TKey : notnull where TValue : class
{
try
{
var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}";
if (_consumerStore.TryGetValue(consumerKey, out var entry))
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
// 从字典中移除
_consumerStore.TryRemove(consumerKey, out entry);
}
}
catch (Exception ex)
{
throw;
}
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
foreach (var entry in _consumerStore.Values)
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
}
_consumerStore.Clear();
}
}
}