using Confluent.Kafka; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using JiShe.CollectBus.Kafka.Attributes; using Volo.Abp.DependencyInjection; using JiShe.CollectBus.Kafka.AdminClient; namespace JiShe.CollectBus.Kafka.Consumer { public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency { private readonly ILogger _logger; private readonly IConsumer _consumer; public ConsumerService( ILogger logger, IConfiguration configuration) { _logger = logger; var consumerConfig = configuration.GetSection("Kafka:ConsumerConfig") .Get(); _consumer = new ConsumerBuilder(consumerConfig) .SetErrorHandler(OnConsumeError) .Build(); } public void Subscribe(string topic) { _consumer.Subscribe(topic); _logger.LogInformation($"Subscribed to topic: {topic}"); } public ConsumeResult Consume(CancellationToken cancellationToken) { return _consumer.Consume(cancellationToken); } public void Commit(ConsumeResult result) { _consumer.Commit(result); _logger.LogDebug($"Committed offset: {result.TopicPartitionOffset}"); } private void OnConsumeError(IConsumer consumer, Error error) { _logger.LogError($"Kafka consumer error: {error.Reason}"); } public void Dispose() { _consumer?.Close(); _consumer?.Dispose(); } } }