using Confluent.Kafka; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using JiShe.CollectBus.Kafka.Attributes; using Volo.Abp.DependencyInjection; using JiShe.CollectBus.Kafka.AdminClient; using static Confluent.Kafka.ConfigPropertyNames; namespace JiShe.CollectBus.Kafka.Consumer { public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency { private readonly ILogger> _logger; private CancellationTokenSource _cancellationTokenSource; protected ConsumerService(IConfiguration configuration, ILogger> logger) { _logger = logger; GetInstance(configuration); } public IConsumer Instance { get; set; } = default; public IConsumer GetInstance(IConfiguration configuration) { var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]); var consumerConfig = new ConsumerConfig { BootstrapServers = configuration["Kafka:BootstrapServers"], AutoOffsetReset = AutoOffsetReset.Earliest }; if (enableAuthorization) { consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; consumerConfig.SaslMechanism = SaslMechanism.Plain; consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"]; consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"]; } Instance = new ConsumerBuilder(consumerConfig).Build(); return Instance; } public async Task SubscribeAsync(string topic, Func messageHandler) { _cancellationTokenSource = new CancellationTokenSource(); Instance.Subscribe(topic); try { while (!_cancellationTokenSource.Token.IsCancellationRequested) { var result = Instance.Consume(_cancellationTokenSource.Token); if (result != null) { await messageHandler(result.Message.Key, result.Message.Value); } } } catch (OperationCanceledException) { Instance.Close(); } } public void Unsubscribe() { _cancellationTokenSource?.Cancel(); Instance?.Unsubscribe(); } public void Dispose() { Unsubscribe(); Instance?.Dispose(); _cancellationTokenSource?.Dispose(); } } }