using Confluent.Kafka; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using JiShe.CollectBus.Kafka.Attributes; using Volo.Abp.DependencyInjection; using JiShe.CollectBus.Kafka.AdminClient; using static Confluent.Kafka.ConfigPropertyNames; using System.Collections.Concurrent; using System.Text.RegularExpressions; using NUglify.Html; using Serilog; using System; using System.Text; namespace JiShe.CollectBus.Kafka.Consumer { public class ConsumerService : IConsumerService, IDisposable { private readonly ILogger _logger; private readonly IConfiguration _configuration; private readonly ConcurrentDictionary _consumerStore = new(); private class KafkaConsumer where TKey : notnull where TValue : class { } public ConsumerService(IConfiguration configuration, ILogger logger) { _configuration = configuration; _logger = logger; } #region private 私有方法 /// /// 创建消费者 /// /// /// /// private IConsumer CreateConsumer(string? groupId = null) where TKey : notnull where TValue : class { var config = BuildConsumerConfig(groupId); return new ConsumerBuilder(config) .SetValueDeserializer(new JsonSerializer()) .SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}")) .SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}")) .Build(); } private ConsumerConfig BuildConsumerConfig(string? groupId = null) { var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!); var config = new ConsumerConfig { BootstrapServers = _configuration["Kafka:BootstrapServers"], GroupId = groupId ?? "default", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit EnablePartitionEof = true, // 启用分区末尾标记 AllowAutoCreateTopics= true, // 启用自动创建 FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB) }; if (enableAuth) { config.SecurityProtocol = SecurityProtocol.SaslPlaintext; config.SaslMechanism = SaslMechanism.Plain; config.SaslUsername = _configuration["Kafka:SaslUserName"]; config.SaslPassword = _configuration["Kafka:SaslPassword"]; } return config; } #endregion /// /// 订阅消息 /// /// /// /// /// /// public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { await SubscribeAsync(new[] { topic }, messageHandler, groupId); } /// /// 订阅消息 /// /// /// /// /// public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class { await SubscribeAsync(new[] { topic }, messageHandler,groupId); } /// /// 订阅消息 /// /// /// /// /// /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); _ = Task.Run(async () => { while (!cts.IsCancellationRequested) { try { var result = consumer.Consume(cts.Token); if (result == null || result.Message==null || result.Message.Value == null) { _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); consumer.Commit(result); // 手动提交 continue; } if (result.IsPartitionEOF) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); await Task.Delay(TimeSpan.FromSeconds(1),cts.Token); continue; } var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition); //consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; } bool sucess= await messageHandler(result.Message.Key, result.Message.Value); if (sucess) { consumer.Commit(result); // 手动提交 } } catch (ConsumeException ex) { _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); } } }); await Task.CompletedTask; } /// /// 订阅消息 /// /// /// /// /// /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _=> ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); _ = Task.Run(async () => { while (!cts.IsCancellationRequested) { try { var result = consumer.Consume(cts.Token); if (result == null || result.Message==null || result.Message.Value == null) { _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); consumer.Commit(result); // 手动提交 continue; } if (result.IsPartitionEOF) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); await Task.Delay(100, cts.Token); continue; } var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header值未匹配", result.Topic, result.Partition); //consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; } bool sucess = await messageHandler(result.Message.Value); if (sucess) consumer.Commit(result); // 手动提交 } catch (ConsumeException ex) { _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); } } }); await Task.CompletedTask; } /// /// 批量订阅消息 /// /// 消息Key类型 /// 消息Value类型 /// 主题 /// 批量消息处理函数 /// 消费组ID /// 批次大小 /// 批次超时时间 public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout); } /// /// 批量订阅消息 /// /// 消息Key类型 /// 消息Value类型 /// 主题列表 /// 批量消息处理函数 /// 消费组ID /// 批次大小 /// 批次超时时间 public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class { var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 _ = Task.Run(async () => { var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); var startTime = DateTime.UtcNow; while (!cts.IsCancellationRequested) { try { // 非阻塞快速累积消息 while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 if (result != null) { if (result.IsPartitionEOF) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); } else if (result.Message.Value != null) { var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition); //consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; } messages.Add((result.Message.Value, result.TopicPartitionOffset)); //messages.Add(result.Message.Value); } } else { // 无消息时短暂等待 await Task.Delay(10, cts.Token); } } // 处理批次 if (messages.Count > 0) { bool success = await messageBatchHandler(messages.Select(m => m.Value)); if (success) { var offsetsByPartition = new Dictionary(); foreach (var msg in messages) { var tp = msg.Offset.TopicPartition; var offset = msg.Offset.Offset; if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) { offsetsByPartition[tp] = offset; } } var offsetsToCommit = offsetsByPartition .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) .ToList(); consumer.Commit(offsetsToCommit); } messages.Clear(); } startTime = DateTime.UtcNow; } catch (ConsumeException ex) { _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); } catch (OperationCanceledException) { // 任务取消,正常退出 } catch (Exception ex) { _logger.LogError(ex, "处理批量消息时发生未知错误"); } } }, cts.Token); await Task.CompletedTask; } /// /// 批量订阅消息 /// /// 消息Value类型 /// 主题列表 /// 批量消息处理函数 /// 消费组ID /// 批次大小 /// 批次超时时间 /// 消费等待时间 public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); } /// /// 批量订阅消息 /// /// 消息Value类型 /// 主题列表 /// 批量消息处理函数 /// 消费组ID /// 批次大小 /// 批次超时时间 /// 消费等待时间 public async Task SubscribeBatchAsync(string[] topics,Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class { var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒 _ = Task.Run(async () => { var messages = new List<(TValue Value, TopicPartitionOffset Offset)>(); //var messages = new List>(); var startTime = DateTime.UtcNow; while (!cts.IsCancellationRequested) { try { // 非阻塞快速累积消息 while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout) { var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用 if (result != null) { if (result.IsPartitionEOF) { _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition); await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); } else if (result.Message.Value != null) { var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { //consumer.Commit(result); // 提交偏移量 // 跳过消息 continue; } messages.Add((result.Message.Value, result.TopicPartitionOffset)); //messages.Add(result.Message.Value); } } else { // 无消息时短暂等待 await Task.Delay(10, cts.Token); } } // 处理批次 if (messages.Count > 0) { bool success = await messageBatchHandler(messages.Select(m => m.Value)); if (success) { var offsetsByPartition = new Dictionary(); foreach (var msg in messages) { var tp = msg.Offset.TopicPartition; var offset = msg.Offset.Offset; if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax) { offsetsByPartition[tp] = offset; } } var offsetsToCommit = offsetsByPartition .Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1))) .ToList(); consumer.Commit(offsetsToCommit); } messages.Clear(); } startTime = DateTime.UtcNow; } catch (ConsumeException ex) { _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); } catch (OperationCanceledException) { // 任务取消,正常退出 } catch (Exception ex) { _logger.LogError(ex, "处理批量消息时发生未知错误"); } } }, cts.Token); await Task.CompletedTask; } /// /// 取消消息订阅 /// /// /// public void Unsubscribe() where TKey : notnull where TValue : class { var consumerKey = typeof((TKey, TValue)); if (_consumerStore.TryRemove(consumerKey, out var entry)) { entry.CTS.Cancel(); (entry.Consumer as IDisposable)?.Dispose(); entry.CTS.Dispose(); } } /// /// 释放资源 /// public void Dispose() { foreach (var entry in _consumerStore.Values) { entry.CTS.Cancel(); (entry.Consumer as IDisposable)?.Dispose(); entry.CTS.Dispose(); } _consumerStore.Clear(); } } }