using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Confluent.Kafka; using JiShe.CollectBus.Kafka.Consumer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.Kafka.Producer { public class ProducerService : IProducerService, IDisposable { private readonly ILogger> _logger; protected ProducerService(IConfiguration configuration, ILogger> logger) { _logger = logger; GetInstance(configuration); } public IProducer Instance { get; set; } = default; public IProducer GetInstance(IConfiguration configuration) { ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); var consumerConfig = new ProducerConfig { BootstrapServers = configuration["Kafka:BootstrapServers"], AllowAutoCreateTopics = true, QueueBufferingMaxKbytes = 2097151, // 修改缓冲区最大为2GB,默认为1GB CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd BatchSize = 32768, // 修改批次大小为32K LingerMs = 20, // 修改等待时间为20ms Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功 MessageSendMaxRetries = 50, // 消息发送失败最大重试50次 }; if (enableAuthorization) { consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; consumerConfig.SaslMechanism = SaslMechanism.Plain; consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"]; consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"]; } Instance = new ProducerBuilder(consumerConfig).Build(); return Instance; } public async Task ProduceAsync(string topic, TKey key, TValue value) { var message = new Message { Key = key, Value = value }; await Instance.ProduceAsync(topic, message); } public async Task ProduceAsync(string topic, TValue value) { var message = new Message { Value = value }; await Instance.ProduceAsync(topic, message); } public void Dispose() { Instance?.Dispose(); } } }