2025-04-09 14:33:20 +08:00
|
|
|
|
using Confluent.Kafka;
|
|
|
|
|
|
using Microsoft.Extensions.Configuration;
|
|
|
|
|
|
using Microsoft.Extensions.Hosting;
|
|
|
|
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
|
|
using JiShe.CollectBus.Kafka.Attributes;
|
|
|
|
|
|
using Volo.Abp.DependencyInjection;
|
|
|
|
|
|
using JiShe.CollectBus.Kafka.AdminClient;
|
2025-04-12 15:11:18 +08:00
|
|
|
|
using static Confluent.Kafka.ConfigPropertyNames;
|
2025-04-15 15:49:22 +08:00
|
|
|
|
using System.Collections.Concurrent;
|
|
|
|
|
|
using System.Text.RegularExpressions;
|
|
|
|
|
|
using NUglify.Html;
|
2025-04-16 09:54:21 +08:00
|
|
|
|
using Serilog;
|
2025-04-09 14:33:20 +08:00
|
|
|
|
|
|
|
|
|
|
namespace JiShe.CollectBus.Kafka.Consumer
|
|
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
public class ConsumerService : IConsumerService, IDisposable
|
2025-04-09 14:33:20 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
private readonly ILogger<ConsumerService> _logger;
|
|
|
|
|
|
private readonly IConfiguration _configuration;
|
|
|
|
|
|
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
|
|
|
|
|
|
_consumerStore = new();
|
2025-04-09 14:33:20 +08:00
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
|
2025-04-09 14:33:20 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
_configuration = configuration;
|
2025-04-09 14:33:20 +08:00
|
|
|
|
_logger = logger;
|
2025-04-12 15:11:18 +08:00
|
|
|
|
}
|
2025-04-09 14:33:20 +08:00
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
#region private 私有方法
|
2025-04-09 14:33:20 +08:00
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 创建消费者
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="TKey"></typeparam>
|
|
|
|
|
|
/// <typeparam name="TValue"></typeparam>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(string? groupId = null) where TKey : notnull where TValue : class
|
|
|
|
|
|
{
|
|
|
|
|
|
var config = BuildConsumerConfig(groupId);
|
|
|
|
|
|
return new ConsumerBuilder<TKey, TValue>(config)
|
2025-04-16 09:54:21 +08:00
|
|
|
|
.SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}"))
|
2025-04-15 15:49:22 +08:00
|
|
|
|
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
|
|
|
|
|
|
.Build();
|
|
|
|
|
|
}
|
2025-04-09 14:33:20 +08:00
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
private ConsumerConfig BuildConsumerConfig(string? groupId = null)
|
2025-04-09 14:33:20 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
|
2025-04-14 19:10:27 +08:00
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
var config = new ConsumerConfig
|
2025-04-12 15:11:18 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
BootstrapServers = _configuration["Kafka:BootstrapServers"],
|
|
|
|
|
|
GroupId = groupId ?? "default",
|
2025-04-14 19:10:27 +08:00
|
|
|
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
2025-04-16 09:54:21 +08:00
|
|
|
|
EnableAutoCommit = false, // 禁止AutoCommit
|
|
|
|
|
|
EnablePartitionEof = true, // 启用分区末尾标记
|
|
|
|
|
|
AllowAutoCreateTopics= true // 启用自动创建
|
2025-04-12 15:11:18 +08:00
|
|
|
|
};
|
2025-04-09 14:33:20 +08:00
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
if (enableAuth)
|
2025-04-12 15:11:18 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
|
|
|
|
|
config.SaslMechanism = SaslMechanism.Plain;
|
|
|
|
|
|
config.SaslUsername = _configuration["Kafka:SaslUserName"];
|
|
|
|
|
|
config.SaslPassword = _configuration["Kafka:SaslPassword"];
|
2025-04-12 15:11:18 +08:00
|
|
|
|
}
|
2025-04-15 15:49:22 +08:00
|
|
|
|
|
|
|
|
|
|
return config;
|
|
|
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 订阅消息
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="TKey"></typeparam>
|
|
|
|
|
|
/// <typeparam name="TValue"></typeparam>
|
|
|
|
|
|
/// <param name="topic"></param>
|
|
|
|
|
|
/// <param name="messageHandler"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task SubscribeAsync<TKey, TValue>(string topic, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
|
|
|
|
|
{
|
|
|
|
|
|
await SubscribeAsync<TKey, TValue>(new[] { topic }, messageHandler, groupId);
|
2025-04-09 14:33:20 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 订阅消息
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="TValue"></typeparam>
|
|
|
|
|
|
/// <param name="topic"></param>
|
|
|
|
|
|
/// <param name="messageHandler"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
|
|
|
|
|
|
{
|
|
|
|
|
|
await SubscribeAsync<TValue>(new[] { topic }, messageHandler,groupId);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 订阅消息
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="TKey"></typeparam>
|
|
|
|
|
|
/// <typeparam name="TValue"></typeparam>
|
|
|
|
|
|
/// <param name="topics"></param>
|
|
|
|
|
|
/// <param name="messageHandler"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
|
2025-04-09 14:33:20 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
var consumerKey = typeof((TKey, TValue));
|
|
|
|
|
|
var cts = new CancellationTokenSource();
|
|
|
|
|
|
|
|
|
|
|
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
|
|
|
|
|
(
|
|
|
|
|
|
CreateConsumer<TKey, TValue>(groupId),
|
|
|
|
|
|
cts
|
|
|
|
|
|
)).Consumer as IConsumer<TKey, TValue>;
|
2025-04-12 15:11:18 +08:00
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
consumer!.Subscribe(topics);
|
|
|
|
|
|
|
|
|
|
|
|
_ = Task.Run(async () =>
|
2025-04-12 15:11:18 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
while (!cts.IsCancellationRequested)
|
2025-04-12 15:11:18 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
try
|
2025-04-12 15:11:18 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
var result = consumer.Consume(cts.Token);
|
2025-04-16 09:54:21 +08:00
|
|
|
|
if (result == null) continue;
|
|
|
|
|
|
if (result.Message.Value == null) continue;
|
|
|
|
|
|
if (result.IsPartitionEOF)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
|
|
|
|
|
await Task.Delay(TimeSpan.FromSeconds(1));
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
|
|
|
|
|
|
if (sucess)
|
|
|
|
|
|
{
|
|
|
|
|
|
consumer.Commit(result); // 手动提交
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (ConsumeException ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
2025-04-12 15:11:18 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-04-15 15:49:22 +08:00
|
|
|
|
});
|
|
|
|
|
|
await Task.CompletedTask;
|
2025-04-09 14:33:20 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-04-15 11:15:22 +08:00
|
|
|
|
/// <summary>
|
2025-04-15 15:49:22 +08:00
|
|
|
|
/// 订阅消息
|
2025-04-15 11:15:22 +08:00
|
|
|
|
/// </summary>
|
2025-04-15 15:49:22 +08:00
|
|
|
|
/// <typeparam name="TKey"></typeparam>
|
|
|
|
|
|
/// <typeparam name="TValue"></typeparam>
|
2025-04-15 11:15:22 +08:00
|
|
|
|
/// <param name="topics"></param>
|
|
|
|
|
|
/// <param name="messageHandler"></param>
|
|
|
|
|
|
/// <returns></returns>
|
2025-04-15 15:49:22 +08:00
|
|
|
|
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
2025-04-15 11:15:22 +08:00
|
|
|
|
{
|
2025-04-15 18:58:38 +08:00
|
|
|
|
var consumerKey = typeof((Ignore, TValue));
|
2025-04-15 15:49:22 +08:00
|
|
|
|
var cts = new CancellationTokenSource();
|
2025-04-15 11:15:22 +08:00
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
|
|
|
|
|
(
|
2025-04-15 18:58:38 +08:00
|
|
|
|
CreateConsumer<Ignore, TValue>(groupId),
|
2025-04-15 15:49:22 +08:00
|
|
|
|
cts
|
2025-04-15 18:58:38 +08:00
|
|
|
|
)).Consumer as IConsumer<Ignore, TValue>;
|
2025-04-15 15:49:22 +08:00
|
|
|
|
|
|
|
|
|
|
consumer!.Subscribe(topics);
|
|
|
|
|
|
|
|
|
|
|
|
_ = Task.Run(async () =>
|
2025-04-15 11:15:22 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
while (!cts.IsCancellationRequested)
|
2025-04-15 11:15:22 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
try
|
2025-04-15 11:15:22 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
var result = consumer.Consume(cts.Token);
|
2025-04-16 09:54:21 +08:00
|
|
|
|
if (result == null) continue;
|
|
|
|
|
|
if (result.Message == null) continue;
|
|
|
|
|
|
if (result.IsPartitionEOF)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
|
|
|
|
|
|
await Task.Delay(TimeSpan.FromSeconds(1));
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
bool sucess = await messageHandler(result.Message.Value);
|
|
|
|
|
|
if (sucess)
|
|
|
|
|
|
consumer.Commit(result); // 手动提交
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (ConsumeException ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
2025-04-15 11:15:22 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-04-15 15:49:22 +08:00
|
|
|
|
});
|
|
|
|
|
|
await Task.CompletedTask;
|
2025-04-15 11:15:22 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 取消消息订阅
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="TKey"></typeparam>
|
|
|
|
|
|
/// <typeparam name="TValue"></typeparam>
|
|
|
|
|
|
public void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class
|
2025-04-09 14:33:20 +08:00
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
var consumerKey = typeof((TKey, TValue));
|
|
|
|
|
|
if (_consumerStore.TryRemove(consumerKey, out var entry))
|
|
|
|
|
|
{
|
|
|
|
|
|
entry.CTS.Cancel();
|
|
|
|
|
|
(entry.Consumer as IDisposable)?.Dispose();
|
|
|
|
|
|
entry.CTS.Dispose();
|
|
|
|
|
|
}
|
2025-04-09 14:33:20 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-15 15:49:22 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 释放资源
|
|
|
|
|
|
/// </summary>
|
2025-04-09 14:33:20 +08:00
|
|
|
|
public void Dispose()
|
|
|
|
|
|
{
|
2025-04-15 15:49:22 +08:00
|
|
|
|
foreach (var entry in _consumerStore.Values)
|
|
|
|
|
|
{
|
|
|
|
|
|
entry.CTS.Cancel();
|
|
|
|
|
|
(entry.Consumer as IDisposable)?.Dispose();
|
|
|
|
|
|
entry.CTS.Dispose();
|
|
|
|
|
|
}
|
|
|
|
|
|
_consumerStore.Clear();
|
2025-04-09 14:33:20 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|