using Confluent.Kafka; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using JiShe.CollectBus.Kafka.Attributes; using Volo.Abp.DependencyInjection; using JiShe.CollectBus.Kafka.AdminClient; using static Confluent.Kafka.ConfigPropertyNames; namespace JiShe.CollectBus.Kafka.Consumer { public abstract class ConsumerService : IConsumerService, IDisposable { private readonly ILogger> _logger; private CancellationTokenSource _cancellationTokenSource; protected ConsumerService(IConfiguration configuration, ILogger> logger) { _logger = logger; GetInstance(configuration); } public IConsumer Instance { get; set; } = default; public IConsumer GetInstance(IConfiguration configuration) { ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); var consumerConfig = new ConsumerConfig { BootstrapServers = configuration["Kafka:BootstrapServers"], AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit Acks = Acks.All, // 需要所有副本响应才算消费完成 }; if (enableAuthorization) { consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; consumerConfig.SaslMechanism = SaslMechanism.Plain; consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"]; consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"]; } Instance = new ConsumerBuilder(consumerConfig).Build(); return Instance; } public async Task SubscribeAsync(string topic, Func messageHandler) { _cancellationTokenSource = new CancellationTokenSource(); Instance.Subscribe(topic); try { while (!_cancellationTokenSource.Token.IsCancellationRequested) { var result = Instance.Consume(_cancellationTokenSource.Token); if (result != null) { await messageHandler(result.Message.Key, result.Message.Value); } } } catch (OperationCanceledException) { Instance.Close(); } } /// /// 订阅多个topic /// /// /// /// public async Task SubscribeAsync(string[] topics, Func messageHandler) { _cancellationTokenSource = new CancellationTokenSource(); Instance.Subscribe(topics); try { while (!_cancellationTokenSource.Token.IsCancellationRequested) { var result = Instance.Consume(_cancellationTokenSource.Token); if (result != null) { await messageHandler(result.Message.Key, result.Message.Value); } } } catch (OperationCanceledException) { Instance.Close(); } } public void Unsubscribe() { _cancellationTokenSource?.Cancel(); Instance?.Unsubscribe(); } public void Dispose() { Unsubscribe(); Instance?.Dispose(); _cancellationTokenSource?.Dispose(); } } }