223 lines
8.7 KiB
C#
223 lines
8.7 KiB
C#
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Collections.Generic;
|
||
using System.Linq;
|
||
using System.Text;
|
||
using System.Threading.Tasks;
|
||
using Confluent.Kafka;
|
||
using JiShe.CollectBus.Kafka.Consumer;
|
||
using JiShe.CollectBus.Kafka.Internal;
|
||
using JiShe.CollectBus.Kafka.Serialization;
|
||
using Microsoft.Extensions.Configuration;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Options;
|
||
using Volo.Abp.DependencyInjection;
|
||
using YamlDotNet.Serialization;
|
||
|
||
namespace JiShe.CollectBus.Kafka.Producer
|
||
{
|
||
public class ProducerService: IProducerService, IDisposable
|
||
{
|
||
private readonly ILogger<ProducerService> _logger;
|
||
private readonly IConfiguration _configuration;
|
||
private readonly ConcurrentDictionary<Type, object> _producerCache = new();
|
||
private class KafkaProducer<TKey, TValue> where TKey : notnull where TValue : class { }
|
||
private readonly KafkaOptionConfig _kafkaOptionConfig;
|
||
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
|
||
{
|
||
_configuration = configuration;
|
||
_logger = logger;
|
||
_kafkaOptionConfig = kafkaOptionConfig.Value;
|
||
}
|
||
|
||
#region private 私有方法
|
||
/// <summary>
|
||
/// 创建生产者实例
|
||
/// </summary>
|
||
/// <typeparam name="TKey"></typeparam>
|
||
/// <typeparam name="TValue"></typeparam>
|
||
/// <returns></returns>
|
||
private IProducer<TKey, TValue> GetProducer<TKey, TValue>(Type typeKey)
|
||
{
|
||
return (IProducer<TKey, TValue>)_producerCache.GetOrAdd(typeKey, _ =>
|
||
{
|
||
var config = BuildProducerConfig();
|
||
return new ProducerBuilder<TKey, TValue>(config)
|
||
.SetValueSerializer(new JsonSerializer<TValue>()) // Value 使用自定义 JSON 序列化
|
||
.SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message))
|
||
.Build();
|
||
});
|
||
}
|
||
|
||
/// <summary>
|
||
/// 配置
|
||
/// </summary>
|
||
/// <returns></returns>
|
||
private ProducerConfig BuildProducerConfig()
|
||
{
|
||
var config = new ProducerConfig
|
||
{
|
||
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
|
||
//AllowAutoCreateTopics = true,
|
||
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB
|
||
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd
|
||
BatchSize = 32_768, // 修改批次大小为32K
|
||
LingerMs = 20, // 修改等待时间为20ms
|
||
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
|
||
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
|
||
MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs
|
||
};
|
||
|
||
if (_kafkaOptionConfig.EnableAuthorization)
|
||
{
|
||
config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol;
|
||
config.SaslMechanism = _kafkaOptionConfig.SaslMechanism;
|
||
config.SaslUsername = _kafkaOptionConfig.SaslUserName;
|
||
config.SaslPassword = _kafkaOptionConfig.SaslPassword;
|
||
}
|
||
|
||
return config;
|
||
}
|
||
|
||
private static LogLevel ConvertLogLevel(SyslogLevel level) => level switch
|
||
{
|
||
SyslogLevel.Emergency => LogLevel.Critical,
|
||
SyslogLevel.Alert => LogLevel.Critical,
|
||
SyslogLevel.Critical => LogLevel.Critical,
|
||
SyslogLevel.Error => LogLevel.Error,
|
||
SyslogLevel.Warning => LogLevel.Warning,
|
||
SyslogLevel.Notice => LogLevel.Information,
|
||
SyslogLevel.Info => LogLevel.Information,
|
||
SyslogLevel.Debug => LogLevel.Debug,
|
||
_ => LogLevel.None
|
||
};
|
||
|
||
#endregion
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <typeparam name="TKey"></typeparam>
|
||
/// <typeparam name="TValue"></typeparam>
|
||
/// <param name="topic"></param>
|
||
/// <param name="key"></param>
|
||
/// <param name="value"></param>
|
||
/// <returns></returns>
|
||
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
|
||
{
|
||
var typeKey = typeof(KafkaProducer<TKey, TValue>);
|
||
var producer = GetProducer<TKey, TValue>(typeKey);
|
||
var message = new Message<TKey, TValue>
|
||
{
|
||
Key = key,
|
||
Value = value,
|
||
Headers = new Headers{
|
||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||
}
|
||
};
|
||
await producer.ProduceAsync(topic, message);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <typeparam name="TValue"></typeparam>
|
||
/// <param name="topic"></param>
|
||
/// <param name="value"></param>
|
||
/// <returns></returns>
|
||
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
|
||
{
|
||
var typeKey = typeof(KafkaProducer<string, TValue>);
|
||
var producer = GetProducer<Null, TValue>(typeKey);
|
||
var message = new Message<Null, TValue>
|
||
{
|
||
//Key= _kafkaOptionConfig.ServerTagName,
|
||
Value = value,
|
||
Headers = new Headers{
|
||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||
}
|
||
};
|
||
await producer.ProduceAsync(topic, message);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <typeparam name="TKey"></typeparam>
|
||
/// <typeparam name="TValue"></typeparam>
|
||
/// <param name="topic"></param>
|
||
/// <param name="key"></param>
|
||
/// <param name="value"></param>
|
||
/// <param name="partition"></param>
|
||
/// <param name="deliveryHandler"></param>
|
||
/// <returns></returns>
|
||
public async Task ProduceAsync<TKey, TValue>(string topic,TKey key,TValue value,int? partition=null, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)where TKey : notnull where TValue : class
|
||
{
|
||
var message = new Message<TKey, TValue>
|
||
{
|
||
Key = key,
|
||
Value = value,
|
||
Headers = new Headers{
|
||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||
}
|
||
};
|
||
var typeKey = typeof(KafkaProducer<TKey, TValue>);
|
||
var producer = GetProducer<TKey, TValue>(typeKey);
|
||
if (partition.HasValue)
|
||
{
|
||
var topicPartition = new TopicPartition(topic, partition.Value);
|
||
producer.Produce(topicPartition, message, deliveryHandler);
|
||
}
|
||
else
|
||
{
|
||
producer.Produce(topic, message, deliveryHandler);
|
||
}
|
||
await Task.CompletedTask;
|
||
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <typeparam name="TValue"></typeparam>
|
||
/// <param name="topic"></param>
|
||
/// <param name="key"></param>
|
||
/// <param name="value"></param>
|
||
/// <param name="partition"></param>
|
||
/// <param name="deliveryHandler"></param>
|
||
/// <returns></returns>
|
||
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
|
||
{
|
||
var message = new Message<Null, TValue>
|
||
{
|
||
//Key = _kafkaOptionConfig.ServerTagName,
|
||
Value = value,
|
||
Headers = new Headers{
|
||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||
}
|
||
};
|
||
var typeKey = typeof(KafkaProducer<Null, TValue>);
|
||
var producer = GetProducer<Null, TValue>(typeKey);
|
||
if (partition.HasValue)
|
||
{
|
||
var topicPartition = new TopicPartition(topic, partition.Value);
|
||
producer.Produce(topicPartition, message, deliveryHandler);
|
||
}
|
||
else
|
||
{
|
||
producer.Produce(topic, message, deliveryHandler);
|
||
}
|
||
await Task.CompletedTask;
|
||
}
|
||
|
||
public void Dispose()
|
||
{
|
||
foreach (var producer in _producerCache.Values.OfType<IDisposable>())
|
||
{
|
||
producer.Dispose();
|
||
}
|
||
_producerCache.Clear();
|
||
}
|
||
}
|
||
}
|