114 lines
3.9 KiB
C#
Raw Normal View History

2025-04-09 14:33:20 +08:00
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Kafka.Attributes;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Kafka.AdminClient;
2025-04-12 15:11:18 +08:00
using static Confluent.Kafka.ConfigPropertyNames;
2025-04-09 14:33:20 +08:00
namespace JiShe.CollectBus.Kafka.Consumer
{
2025-04-14 19:10:27 +08:00
public abstract class ConsumerService<TKey, TValue> : IConsumerService<TKey, TValue>, IDisposable
2025-04-09 14:33:20 +08:00
{
2025-04-12 15:11:18 +08:00
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
private CancellationTokenSource _cancellationTokenSource;
2025-04-09 14:33:20 +08:00
2025-04-12 15:11:18 +08:00
protected ConsumerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger)
2025-04-09 14:33:20 +08:00
{
_logger = logger;
2025-04-12 15:11:18 +08:00
GetInstance(configuration);
}
2025-04-09 14:33:20 +08:00
2025-04-12 15:11:18 +08:00
public IConsumer<TKey, TValue> Instance { get; set; } = default;
2025-04-09 14:33:20 +08:00
2025-04-12 15:11:18 +08:00
public IConsumer<TKey, TValue> GetInstance(IConfiguration configuration)
2025-04-09 14:33:20 +08:00
{
2025-04-14 19:10:27 +08:00
ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
2025-04-12 15:11:18 +08:00
var consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
2025-04-14 19:10:27 +08:00
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
Acks = Acks.All, // 需要所有副本响应才算消费完成
2025-04-12 15:11:18 +08:00
};
2025-04-09 14:33:20 +08:00
2025-04-12 15:11:18 +08:00
if (enableAuthorization)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ConsumerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
2025-04-09 14:33:20 +08:00
}
2025-04-12 15:11:18 +08:00
public async Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler)
2025-04-09 14:33:20 +08:00
{
2025-04-12 15:11:18 +08:00
_cancellationTokenSource = new CancellationTokenSource();
Instance.Subscribe(topic);
try
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
var result = Instance.Consume(_cancellationTokenSource.Token);
if (result != null)
{
await messageHandler(result.Message.Key, result.Message.Value);
}
}
}
catch (OperationCanceledException)
{
Instance.Close();
}
2025-04-09 14:33:20 +08:00
}
2025-04-15 11:15:22 +08:00
/// <summary>
/// 订阅多个topic
/// </summary>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync(string[] topics, Func<TKey, TValue, Task> messageHandler)
{
_cancellationTokenSource = new CancellationTokenSource();
Instance.Subscribe(topics);
try
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
var result = Instance.Consume(_cancellationTokenSource.Token);
if (result != null)
{
await messageHandler(result.Message.Key, result.Message.Value);
}
}
}
catch (OperationCanceledException)
{
Instance.Close();
}
}
2025-04-12 15:11:18 +08:00
public void Unsubscribe()
2025-04-09 14:33:20 +08:00
{
2025-04-12 15:11:18 +08:00
_cancellationTokenSource?.Cancel();
Instance?.Unsubscribe();
2025-04-09 14:33:20 +08:00
}
public void Dispose()
{
2025-04-12 15:11:18 +08:00
Unsubscribe();
Instance?.Dispose();
_cancellationTokenSource?.Dispose();
2025-04-09 14:33:20 +08:00
}
}
}