2025-04-21 10:17:40 +08:00

547 lines
25 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Text;
namespace JiShe.CollectBus.Kafka.Consumer
{
public class ConsumerService : IConsumerService, IDisposable
{
private readonly ILogger<ConsumerService> _logger;
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
_consumerStore = new();
private readonly KafkaOptionConfig _kafkaOptionConfig;
private class KafkaConsumer<TKey, TValue> where TKey : notnull where TValue : class { }
/// <summary>
/// ConsumerService
/// </summary>
/// <param name="logger"></param>
/// <param name="kafkaOptionConfig"></param>
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
}
#region private
/// <summary>
/// 创建消费者
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
private IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(string? groupId = null) where TKey : notnull where TValue : class
{
var config = BuildConsumerConfig(groupId);
return new ConsumerBuilder<TKey, TValue>(config)
.SetValueDeserializer(new JsonSerializer<TValue>())
.SetLogHandler((_, log) => _logger.LogInformation($"消费者Log: {log.Message}"))
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
.Build();
}
private ConsumerConfig BuildConsumerConfig(string? groupId = null)
{
var config = new ConsumerConfig
{
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
GroupId = groupId ?? _kafkaOptionConfig.ServerTagName,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记
//AllowAutoCreateTopics = true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
};
if (_kafkaOptionConfig.EnableAuthorization)
{
config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol;
config.SaslMechanism = _kafkaOptionConfig.SaslMechanism;
config.SaslUsername = _kafkaOptionConfig.SaslUserName;
config.SaslPassword = _kafkaOptionConfig.SaslPassword;
}
return config;
}
#endregion
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string topic, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
await SubscribeAsync<TKey, TValue>(new[] { topic }, messageHandler, groupId);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
{
await SubscribeAsync<TValue>(new[] { topic }, messageHandler,groupId);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource();
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
//(
// CreateConsumer<TKey, TValue>(groupId),
// cts
//)).Consumer as IConsumer<TKey, TValue>;
var consumer = CreateConsumer<TKey, TValue>(groupId);
consumer!.Subscribe(topics);
await Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)} 开始拉取消息....");
var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null)
continue;
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
{
consumer.Commit(result); // 手动提交
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
}
}
});
await Task.CompletedTask;
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <param name="groupId"></param>
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{
try {
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
var cts = new CancellationTokenSource();
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
//{
// string ssss = "";
//}
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
//(
// CreateConsumer<string, TValue>(groupId),
// cts
//)).Consumer as IConsumer<string, TValue>;
var consumer = CreateConsumer<Ignore, TValue>(groupId);
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
int count = 0;
while (!cts.IsCancellationRequested)
{
try
{
//_logger.LogInformation($"Kafka消费: {string.Join("", topics)}_{count} 开始拉取消息....");
count++;
var result = consumer.Consume(cts.Token);
if (result == null || result.Message == null || result.Message.Value == null)
{
await Task.Delay(500, cts.Token);
continue;
}
if (result.IsPartitionEOF)
{
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(100, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
await Task.Delay(500, cts.Token);
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
else
consumer.StoreOffset(result);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
}
}
});
} catch (Exception ex)
{
_logger.LogWarning($"Kafka消费异常: {ex.Message}");
}
await Task.CompletedTask;
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TKey">消息Key类型</typeparam>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topic">主题</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
await SubscribeBatchAsync<TKey, TValue>(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout);
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TKey">消息Key类型</typeparam>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topics">主题列表</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
public async Task SubscribeBatchAsync<TKey, TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null,int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class
{
var consumerKey = typeof(KafkaConsumer<TKey, TValue>);
var cts = new CancellationTokenSource();
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
// (
// CreateConsumer<TKey, TValue>(groupId),
// cts
// )).Consumer as IConsumer<TKey, TValue>;
var consumer = CreateConsumer<string, TValue>(groupId);
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
if (result.IsPartitionEOF)
{
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(10, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value);
}
}
else
{
// 无消息时短暂等待
await Task.Delay(10, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
}
catch (OperationCanceledException)
{
// 任务取消,正常退出
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topic">主题列表</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string topic, Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{
await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
}
/// <summary>
/// 批量订阅消息
/// </summary>
/// <typeparam name="TValue">消息Value类型</typeparam>
/// <param name="topics">主题列表</param>
/// <param name="messageBatchHandler">批量消息处理函数</param>
/// <param name="groupId">消费组ID</param>
/// <param name="batchSize">批次大小</param>
/// <param name="batchTimeout">批次超时时间</param>
/// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<List<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
{
var consumerKey = typeof(KafkaConsumer<string, TValue>);
var cts = new CancellationTokenSource();
//var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
// (
// CreateConsumer<string, TValue>(groupId),
// cts
// )).Consumer as IConsumer<string, TValue>;
var consumer= CreateConsumer<string, TValue> (groupId);
consumer!.Subscribe(topics);
var timeout = batchTimeout ?? TimeSpan.FromSeconds(5); // 默认超时时间调整为5秒
_ = Task.Run(async () =>
{
var messages = new List<(TValue Value, TopicPartitionOffset Offset)>();
//var messages = new List<ConsumeResult<TKey, TValue>>();
var startTime = DateTime.UtcNow;
while (!cts.IsCancellationRequested)
{
try
{
// 非阻塞快速累积消息
while (messages.Count < batchSize && (DateTime.UtcNow - startTime) < timeout)
{
var result = consumer.Consume(TimeSpan.Zero); // 非阻塞调用
if (result != null)
{
if (result.IsPartitionEOF)
{
//_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} 已消费完", result.Topic, result.Partition);
await Task.Delay(10, cts.Token);
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
//consumer.Commit(result); // 提交偏移量
// 跳过消息
continue;
}
}
messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value);
}
}
else
{
// 无消息时短暂等待
await Task.Delay(10, cts.Token);
}
}
// 处理批次
if (messages.Count > 0)
{
bool success = await messageBatchHandler(messages.Select(m => m.Value).ToList());
if (success)
{
var offsetsByPartition = new Dictionary<TopicPartition, long>();
foreach (var msg in messages)
{
var tp = msg.Offset.TopicPartition;
var offset = msg.Offset.Offset;
if (!offsetsByPartition.TryGetValue(tp, out var currentMax) || offset > currentMax)
{
offsetsByPartition[tp] = offset;
}
}
var offsetsToCommit = offsetsByPartition
.Select(kv => new TopicPartitionOffset(kv.Key, new Offset(kv.Value + 1)))
.ToList();
consumer.Commit(offsetsToCommit);
}
messages.Clear();
}
startTime = DateTime.UtcNow;
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
catch (OperationCanceledException)
{
// 任务取消,正常退出
}
catch (Exception ex)
{
_logger.LogError(ex, "处理批量消息时发生未知错误");
}
}
}, cts.Token);
await Task.CompletedTask;
}
/// <summary>
/// 取消消息订阅
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class
{
var consumerKey = typeof((TKey, TValue));
if (_consumerStore.TryRemove(consumerKey, out var entry))
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
}
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
foreach (var entry in _consumerStore.Values)
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
}
_consumerStore.Clear();
}
}
}