219 lines
8.5 KiB
C#
Raw Normal View History

2025-04-09 14:33:20 +08:00
using System;
2025-04-15 15:49:22 +08:00
using System.Collections.Concurrent;
2025-04-09 14:33:20 +08:00
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
2025-04-12 15:11:18 +08:00
using JiShe.CollectBus.Kafka.Consumer;
2025-04-09 14:33:20 +08:00
using Microsoft.Extensions.Configuration;
2025-04-12 15:11:18 +08:00
using Microsoft.Extensions.Logging;
2025-04-17 13:54:18 +08:00
using Microsoft.Extensions.Options;
2025-04-09 14:33:20 +08:00
using Volo.Abp.DependencyInjection;
using YamlDotNet.Serialization;
2025-04-09 14:33:20 +08:00
namespace JiShe.CollectBus.Kafka.Producer
{
2025-04-15 15:49:22 +08:00
public class ProducerService: IProducerService, IDisposable
2025-04-09 14:33:20 +08:00
{
2025-04-15 15:49:22 +08:00
private readonly ILogger<ProducerService> _logger;
private readonly IConfiguration _configuration;
2025-04-16 18:26:25 +08:00
private readonly ConcurrentDictionary<Type, object> _producerCache = new();
private class KafkaProducer<TKey, TValue> where TKey : notnull where TValue : class { }
2025-04-17 11:42:35 +08:00
private readonly KafkaOptionConfig _kafkaOptionConfig;
2025-04-17 13:54:18 +08:00
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
2025-04-09 14:33:20 +08:00
{
2025-04-15 15:49:22 +08:00
_configuration = configuration;
2025-04-12 15:11:18 +08:00
_logger = logger;
2025-04-17 13:54:18 +08:00
_kafkaOptionConfig = kafkaOptionConfig.Value;
2025-04-09 14:33:20 +08:00
}
2025-04-15 15:49:22 +08:00
#region private
/// <summary>
/// 创建生产者实例
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
2025-04-16 18:26:25 +08:00
private IProducer<TKey, TValue> GetProducer<TKey, TValue>(Type typeKey)
2025-04-15 15:49:22 +08:00
{
return (IProducer<TKey, TValue>)_producerCache.GetOrAdd(typeKey, _ =>
{
var config = BuildProducerConfig();
return new ProducerBuilder<TKey, TValue>(config)
2025-04-16 18:26:25 +08:00
.SetValueSerializer(new JsonSerializer<TValue>()) // Value 使用自定义 JSON 序列化
2025-04-15 15:49:22 +08:00
.SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message))
.Build();
});
}
2025-04-12 15:11:18 +08:00
2025-04-15 15:49:22 +08:00
/// <summary>
/// 配置
/// </summary>
/// <returns></returns>
private ProducerConfig BuildProducerConfig()
2025-04-12 15:11:18 +08:00
{
2025-04-15 15:49:22 +08:00
var config = new ProducerConfig
2025-04-12 15:11:18 +08:00
{
2025-04-17 11:42:35 +08:00
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
2025-04-14 19:10:27 +08:00
AllowAutoCreateTopics = true,
2025-04-15 15:49:22 +08:00
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB
2025-04-14 19:10:27 +08:00
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
2025-04-15 15:49:22 +08:00
BatchSize = 32_768, // 修改批次大小为32K
2025-04-14 19:10:27 +08:00
LingerMs = 20, // 修改等待时间为20ms
2025-04-15 11:15:22 +08:00
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
2025-04-14 19:10:27 +08:00
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs
2025-04-12 15:11:18 +08:00
};
2025-04-17 11:42:35 +08:00
if (_kafkaOptionConfig.EnableAuthorization)
2025-04-12 15:11:18 +08:00
{
2025-04-17 11:42:35 +08:00
config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol;
config.SaslMechanism = _kafkaOptionConfig.SaslMechanism;
config.SaslUsername = _kafkaOptionConfig.SaslUserName;
config.SaslPassword = _kafkaOptionConfig.SaslPassword;
2025-04-12 15:11:18 +08:00
}
2025-04-15 15:49:22 +08:00
return config;
}
private static LogLevel ConvertLogLevel(SyslogLevel level) => level switch
{
SyslogLevel.Emergency => LogLevel.Critical,
SyslogLevel.Alert => LogLevel.Critical,
SyslogLevel.Critical => LogLevel.Critical,
SyslogLevel.Error => LogLevel.Error,
SyslogLevel.Warning => LogLevel.Warning,
SyslogLevel.Notice => LogLevel.Information,
SyslogLevel.Info => LogLevel.Information,
SyslogLevel.Debug => LogLevel.Debug,
_ => LogLevel.None
};
#endregion
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
{
2025-04-16 18:26:25 +08:00
var typeKey = typeof(KafkaProducer<TKey, TValue>);
var producer = GetProducer<TKey, TValue>(typeKey);
var message = new Message<TKey, TValue>
{
Key = key,
Value = value,
Headers = new Headers{
2025-04-17 11:42:35 +08:00
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
2025-04-16 18:26:25 +08:00
}
};
await producer.ProduceAsync(topic, message);
2025-04-15 15:49:22 +08:00
}
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{
2025-04-16 18:26:25 +08:00
var typeKey = typeof(KafkaProducer<string, TValue>);
var producer = GetProducer<string, TValue>(typeKey);
var message = new Message<string, TValue>
{
Value = value,
Headers = new Headers{
2025-04-17 11:42:35 +08:00
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
2025-04-16 18:26:25 +08:00
}
};
await producer.ProduceAsync(topic, message);
2025-04-12 15:11:18 +08:00
}
2025-04-15 15:49:22 +08:00
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="partition"></param>
/// <param name="deliveryHandler"></param>
/// <returns></returns>
public async Task ProduceAsync<TKey, TValue>(string topic,TKey key,TValue value,int? partition=null, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)where TKey : notnull where TValue : class
2025-04-12 15:11:18 +08:00
{
var message = new Message<TKey, TValue>
{
Key = key,
2025-04-16 18:26:25 +08:00
Value = value,
Headers = new Headers{
2025-04-17 11:42:35 +08:00
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
2025-04-16 18:26:25 +08:00
}
2025-04-12 15:11:18 +08:00
};
2025-04-16 18:26:25 +08:00
var typeKey = typeof(KafkaProducer<TKey, TValue>);
var producer = GetProducer<TKey, TValue>(typeKey);
2025-04-15 15:49:22 +08:00
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
producer.Produce(topicPartition, message, deliveryHandler);
}
else
{
producer.Produce(topic, message, deliveryHandler);
}
await Task.CompletedTask;
2025-04-12 15:11:18 +08:00
}
2025-04-15 15:49:22 +08:00
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="partition"></param>
/// <param name="deliveryHandler"></param>
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<string, TValue>>? deliveryHandler = null) where TValue : class
2025-04-09 14:33:20 +08:00
{
var message = new Message<string, TValue>
2025-04-09 14:33:20 +08:00
{
2025-04-16 18:26:25 +08:00
Value = value,
Headers = new Headers{
2025-04-17 11:42:35 +08:00
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
2025-04-16 18:26:25 +08:00
}
2025-04-12 15:11:18 +08:00
};
2025-04-16 18:26:25 +08:00
var typeKey = typeof(KafkaProducer<string, TValue>);
var producer = GetProducer<string, TValue>(typeKey);
2025-04-15 15:49:22 +08:00
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
producer.Produce(topicPartition, message, deliveryHandler);
}
else
{
producer.Produce(topic, message, deliveryHandler);
}
await Task.CompletedTask;
2025-04-12 15:11:18 +08:00
}
public void Dispose()
{
2025-04-15 15:49:22 +08:00
foreach (var producer in _producerCache.Values.OfType<IDisposable>())
{
producer.Dispose();
}
_producerCache.Clear();
2025-04-09 14:33:20 +08:00
}
}
}