using Confluent.Kafka; using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; using Microsoft.AspNetCore.DataProtection.KeyManagement; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Text; using System.Text.RegularExpressions; using System.Threading; namespace JiShe.CollectBus.Kafka.Consumer { public class ConsumerService : IConsumerService, IDisposable { private readonly ILogger _logger; /// /// 消费者存储 /// Key 格式:{groupId}_{topic}_{TKey}_{TValue} /// private readonly ConcurrentDictionary _consumerStore = new(); /// /// 消费完或者无数据时的延迟时间 /// private static TimeSpan DelayTime => TimeSpan.FromMilliseconds(100); private readonly KafkaOptionConfig _kafkaOptionConfig; private readonly ServerApplicationOptions _applicationOptions; private readonly KafkaPollyPipeline _kafkaPollyPipeline; /// /// ConsumerService /// /// /// public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions applicationOptions) { _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; _applicationOptions = applicationOptions.Value; _kafkaPollyPipeline = kafkaPollyPipeline; } #region private 私有方法 /// /// 创建消费者 /// /// /// /// private IConsumer CreateConsumer(string? groupId = null) where TKey : notnull where TValue : class { var config = BuildConsumerConfig(groupId); return new ConsumerBuilder(config) .SetValueDeserializer(new JsonSerializer()) .SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}")) .SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}")) .Build(); } private ConsumerConfig BuildConsumerConfig(string? groupId = null) { var config = new ConsumerConfig { BootstrapServers = _kafkaOptionConfig.BootstrapServers, GroupId = groupId ?? _applicationOptions.ServerTagName, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit EnablePartitionEof = true, // 启用分区末尾标记 //AllowAutoCreateTopics = true, // 启用自动创建 FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB) }; if (_kafkaOptionConfig.EnableAuthorization) { config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol; config.SaslMechanism = _kafkaOptionConfig.SaslMechanism; config.SaslUsername = _kafkaOptionConfig.SaslUserName; config.SaslPassword = _kafkaOptionConfig.SaslPassword; } return config; } #endregion /// /// 订阅消息 /// /// /// /// /// /// public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { await SubscribeAsync(new[] { topic }, messageHandler, groupId); } /// /// 订阅消息 /// /// /// /// /// public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class { await SubscribeAsync(new[] { topic }, messageHandler, groupId); } /// /// 订阅消息 /// /// /// /// /// /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); _ = Task.Run(async () => { while (!cts.IsCancellationRequested) { try { //_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息...."); var result = consumer.Consume(cts.Token); if (result == null || result.Message == null || result.Message.Value == null) { await Task.Delay(DelayTime, cts.Token); continue; } if (result.IsPartitionEOF) { #if DEBUG _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); #endif await Task.Delay(DelayTime, cts.Token); continue; } if (_kafkaOptionConfig.EnableFilter) { var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; } } bool sucess = await messageHandler(result.Message.Key, result.Message.Value); if (sucess) consumer.Commit(result); // 手动提交 } catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } catch (OperationCanceledException) { //ignore } catch (Exception ex) { _logger.LogError(ex, "处理消息时发生未知错误"); } } }, cts.Token); await Task.CompletedTask; }); } /// /// 订阅消息 /// /// /// /// /// /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); _ = Task.Run(async () => { int count = 0; while (!cts.IsCancellationRequested) { try { //_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息...."); count++; var result = consumer.Consume(cts.Token); if (result == null || result.Message == null || result.Message.Value == null) { await Task.Delay(DelayTime, cts.Token); continue; } if (result.IsPartitionEOF) { #if DEBUG _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); #endif await Task.Delay(DelayTime, cts.Token); continue; } if (_kafkaOptionConfig.EnableFilter) { var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; } } bool sucess = await messageHandler(result.Message.Value); if (sucess) consumer.Commit(result); // 手动提交 //else // consumer.StoreOffset(result); } catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { _logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } catch (OperationCanceledException) { //ignore } catch (Exception ex) { _logger.LogError(ex, "处理消息时发生未知错误"); } } }, cts.Token); await Task.CompletedTask; }); } /// /// 批量订阅消息 /// /// 消息Key类型 /// 消息Value类型 /// 主题 /// 批量消息处理函数 /// 消费组ID /// 批次大小 /// 批次超时时间 public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout); } /// /// 批量订阅消息 /// /// 消息Key类型 /// 消息Value类型 /// 主题列表 /// 批量消息处理函数 /// 消费组ID /// 批次大小 /// 批次超时时间 public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 _ = Task.Run(async () => { var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); var startTime = DateTime.UtcNow; while (!cts.IsCancellationRequested) { try { // 非阻塞快速累积消息 while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 if (result != null) { if (result.IsPartitionEOF) { #if DEBUG _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); #endif await Task.Delay(DelayTime, cts.Token); } else if (result.Message.Value != null) { if (_kafkaOptionConfig.EnableFilter) { var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; } } messages.Add((result.Message.Value, result.TopicPartitionOffset)); } } else { // 无消息时短暂等待 await Task.Delay(DelayTime, cts.Token); } } // 处理批次 if (messages.Count > 0) { bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); if (success) { var offsetsByPartition = new Dictionary(); foreach (var msg in messages) { var tp = msg.Offset.TopicPartition; var offset = msg.Offset.Offset; if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) { offsetsByPartition[tp] = offset; } } var offsetsToCommit = offsetsByPartition .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) .ToList(); consumer.Commit(offsetsToCommit); } messages.Clear(); } startTime = DateTime.UtcNow; } catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } catch (OperationCanceledException) { //ignore } catch (Exception ex) { _logger.LogError(ex, "处理批量消息时发生未知错误"); } } }, cts.Token); await Task.CompletedTask; }); } /// /// 批量订阅消息 /// /// 消息Value类型 /// 主题列表 /// 批量消息处理函数 /// 消费组ID /// 批次大小 /// 批次超时时间 /// 消费等待时间 public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); } /// /// 批量订阅消息 /// /// 消息Value类型 /// 主题列表 /// 批量消息处理函数 /// 消费组ID /// 批次大小 /// 批次超时时间 /// 消费等待时间 public async Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { await _kafkaPollyPipeline.KafkaPipeline.ExecuteAsync(async token => { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(Ignore).Name}_{typeof(TValue).Name}"; var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 _ = Task.Run(async () => { var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); var startTime = DateTime.UtcNow; while (!cts.IsCancellationRequested) { try { // 非阻塞快速累积消息 while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 if (result != null) { if (result.IsPartitionEOF) { //_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); await Task.Delay(DelayTime, cts.Token); } else if (result.Message.Value != null) { if (_kafkaOptionConfig.EnableFilter) { var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; } } messages.Add((result.Message.Value, result.TopicPartitionOffset)); } } else { // 无消息时短暂等待 await Task.Delay(DelayTime, cts.Token); } } // 处理批次 if (messages.Count > 0) { bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList()); if (success) { var offsetsByPartition = new Dictionary(); foreach (var msg in messages) { var tp = msg.Offset.TopicPartition; var offset = msg.Offset.Offset; if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) { offsetsByPartition[tp] = offset; } } var offsetsToCommit = offsetsByPartition .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) .ToList(); consumer.Commit(offsetsToCommit); } messages.Clear(); } startTime = DateTime.UtcNow; } catch (ConsumeException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } catch (KafkaException ex) when (KafkaPollyPipeline.IsRecoverableError(ex)) { _logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}"); throw; // 抛出异常,以便重试 } catch (OperationCanceledException) { //ignore } catch (Exception ex) { _logger.LogError(ex, "处理批量消息时发生未知错误"); } } }, cts.Token); await Task.CompletedTask; }); } /// /// 取消消息订阅 /// /// /// public void Unsubscribe(string[] topics, string? groupId) where TKey : notnull where TValue : class { var consumerKey = $"{groupId}_{string.Join("_", topics)}_{typeof(TKey).Name}_{typeof(TValue).Name}"; if (_consumerStore.TryRemove(consumerKey, out var entry)) { entry.CTS.Cancel(); (entry.Consumer as IDisposable)?.Dispose(); entry.CTS.Dispose(); } } /// /// 释放资源 /// public void Dispose() { foreach (var entry in _consumerStore.Values) { entry.CTS.Cancel(); (entry.Consumer as IDisposable)?.Dispose(); entry.CTS.Dispose(); } _consumerStore.Clear(); } } }