using Confluent.Kafka; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using JiShe.CollectBus.Kafka.Attributes; using Volo.Abp.DependencyInjection; using JiShe.CollectBus.Kafka.AdminClient; using static Confluent.Kafka.ConfigPropertyNames; using System.Collections.Concurrent; using System.Text.RegularExpressions; using NUglify.Html; namespace JiShe.CollectBus.Kafka.Consumer { public class ConsumerService : IConsumerService, IDisposable { private readonly ILogger _logger; private readonly IConfiguration _configuration; private readonly ConcurrentDictionary _consumerStore = new(); public ConsumerService(IConfiguration configuration, ILogger logger) { _configuration = configuration; _logger = logger; } #region private 私有方法 /// /// 创建消费者 /// /// /// /// private IConsumer CreateConsumer(string? groupId = null) where TKey : notnull where TValue : class { var config = BuildConsumerConfig(groupId); return new ConsumerBuilder(config) .SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}")) .Build(); } private ConsumerConfig BuildConsumerConfig(string? groupId = null) { var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!); var config = new ConsumerConfig { BootstrapServers = _configuration["Kafka:BootstrapServers"], GroupId = groupId ?? "default", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false // 禁止AutoCommit }; if (enableAuth) { config.SecurityProtocol = SecurityProtocol.SaslPlaintext; config.SaslMechanism = SaslMechanism.Plain; config.SaslUsername = _configuration["Kafka:SaslUserName"]; config.SaslPassword = _configuration["Kafka:SaslPassword"]; } return config; } #endregion /// /// 订阅消息 /// /// /// /// /// /// public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { await SubscribeAsync(new[] { topic }, messageHandler, groupId); } /// /// 订阅消息 /// /// /// /// /// public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class { await SubscribeAsync(new[] { topic }, messageHandler,groupId); } /// /// 订阅消息 /// /// /// /// /// /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class { var consumerKey = typeof((TKey, TValue)); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _ => ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); _ = Task.Run(async () => { while (!cts.IsCancellationRequested) { try { var result = consumer.Consume(cts.Token); bool sucess= await messageHandler(result.Message.Key, result.Message.Value); if (sucess) { consumer.Commit(result); // 手动提交 } } catch (ConsumeException ex) { _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); } } }); await Task.CompletedTask; } /// /// 订阅消息 /// /// /// /// /// /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { var consumerKey = typeof((Ignore, TValue)); var cts = new CancellationTokenSource(); var consumer = _consumerStore.GetOrAdd(consumerKey, _=> ( CreateConsumer(groupId), cts )).Consumer as IConsumer; consumer!.Subscribe(topics); _ = Task.Run(async () => { while (!cts.IsCancellationRequested) { try { var result = consumer.Consume(cts.Token); bool sucess = await messageHandler(result.Message.Value); if (sucess) consumer.Commit(result); // 手动提交 } catch (ConsumeException ex) { _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); } } }); await Task.CompletedTask; } /// /// 取消消息订阅 /// /// /// public void Unsubscribe() where TKey : notnull where TValue : class { var consumerKey = typeof((TKey, TValue)); if (_consumerStore.TryRemove(consumerKey, out var entry)) { entry.CTS.Cancel(); (entry.Consumer as IDisposable)?.Dispose(); entry.CTS.Dispose(); } } /// /// 释放资源 /// public void Dispose() { foreach (var entry in _consumerStore.Values) { entry.CTS.Cancel(); (entry.Consumer as IDisposable)?.Dispose(); entry.CTS.Dispose(); } _consumerStore.Clear(); } } }