using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Confluent.Kafka; using JiShe.CollectBus.Kafka.Consumer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using YamlDotNet.Serialization; namespace JiShe.CollectBus.Kafka.Producer { public class ProducerService: IProducerService, IDisposable { private readonly ILogger _logger; private readonly IConfiguration _configuration; private readonly ConcurrentDictionary _producerCache = new(); private class KafkaProducer where TKey : notnull where TValue : class { } private readonly KafkaOptionConfig _kafkaOptionConfig; public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig) { _configuration = configuration; _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; } #region private 私有方法 /// /// 创建生产者实例 /// /// /// /// private IProducer GetProducer(Type typeKey) { return (IProducer)_producerCache.GetOrAdd(typeKey, _ => { var config = BuildProducerConfig(); return new ProducerBuilder(config) .SetValueSerializer(new JsonSerializer()) // Value 使用自定义 JSON 序列化 .SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message)) .Build(); }); } /// /// 配置 /// /// private ProducerConfig BuildProducerConfig() { var config = new ProducerConfig { BootstrapServers = _kafkaOptionConfig.BootstrapServers, AllowAutoCreateTopics = true, QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd BatchSize = 32_768, // 修改批次大小为32K LingerMs = 20, // 修改等待时间为20ms Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader MessageSendMaxRetries = 50, // 消息发送失败最大重试50次 MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs }; if (_kafkaOptionConfig.EnableAuthorization) { config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol; config.SaslMechanism = _kafkaOptionConfig.SaslMechanism; config.SaslUsername = _kafkaOptionConfig.SaslUserName; config.SaslPassword = _kafkaOptionConfig.SaslPassword; } return config; } private static LogLevel ConvertLogLevel(SyslogLevel level) => level switch { SyslogLevel.Emergency => LogLevel.Critical, SyslogLevel.Alert => LogLevel.Critical, SyslogLevel.Critical => LogLevel.Critical, SyslogLevel.Error => LogLevel.Error, SyslogLevel.Warning => LogLevel.Warning, SyslogLevel.Notice => LogLevel.Information, SyslogLevel.Info => LogLevel.Information, SyslogLevel.Debug => LogLevel.Debug, _ => LogLevel.None }; #endregion /// /// 发布消息 /// /// /// /// /// /// /// public async Task ProduceAsync(string topic, TKey key, TValue value)where TKey : notnull where TValue : class { var typeKey = typeof(KafkaProducer); var producer = GetProducer(typeKey); var message = new Message { Key = key, Value = value, Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } }; await producer.ProduceAsync(topic, message); } /// /// 发布消息 /// /// /// /// /// public async Task ProduceAsync(string topic, TValue value) where TValue : class { var typeKey = typeof(KafkaProducer); var producer = GetProducer(typeKey); var message = new Message { Key= _kafkaOptionConfig.ServerTagName, Value = value, Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } }; await producer.ProduceAsync(topic, message); } /// /// 发布消息 /// /// /// /// /// /// /// /// /// public async Task ProduceAsync(string topic,TKey key,TValue value,int? partition=null, Action>? deliveryHandler = null)where TKey : notnull where TValue : class { var message = new Message { Key = key, Value = value, Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } }; var typeKey = typeof(KafkaProducer); var producer = GetProducer(typeKey); if (partition.HasValue) { var topicPartition = new TopicPartition(topic, partition.Value); producer.Produce(topicPartition, message, deliveryHandler); } else { producer.Produce(topic, message, deliveryHandler); } await Task.CompletedTask; } /// /// 发布消息 /// /// /// /// /// /// /// /// public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class { var message = new Message { Key = _kafkaOptionConfig.ServerTagName, Value = value, Headers = new Headers{ { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } }; var typeKey = typeof(KafkaProducer); var producer = GetProducer(typeKey); if (partition.HasValue) { var topicPartition = new TopicPartition(topic, partition.Value); producer.Produce(topicPartition, message, deliveryHandler); } else { producer.Produce(topic, message, deliveryHandler); } await Task.CompletedTask; } public void Dispose() { foreach (var producer in _producerCache.Values.OfType()) { producer.Dispose(); } _producerCache.Clear(); } } }