From be076a81e5dcc8bdb3c3c9c41f967df5b316788a Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Tue, 15 Apr 2025 15:49:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96kafka=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Samples/SampleAppService.cs | 15 +- .../Attributes/KafkaSubscribeAttribute.cs | 22 +- .../CollectBusKafkaModule.cs | 4 +- .../Consumer/ConsumerService.cs | 233 +++++++++++++----- .../Consumer/IConsumerService.cs | 26 +- .../KafkaSubcribesExtensions.cs | 55 +++-- .../Producer/IProducerService.cs | 10 +- .../Producer/ProducerService.cs | 169 ++++++++++--- 8 files changed, 390 insertions(+), 144 deletions(-) diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 8471d2d..e9ce063 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -17,10 +17,13 @@ using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IotSystems.AFNEntity; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; +using JiShe.CollectBus.Kafka.Attributes; +using System.Text.Json; +using JiShe.CollectBus.Kafka; namespace JiShe.CollectBus.Samples; -public class SampleAppService : CollectBusAppService, ISampleAppService +public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaSubscribe { private readonly ILogger _logger; private readonly IIoTDBProvider _iotDBProvider; @@ -181,9 +184,11 @@ public class SampleAppService : CollectBusAppService, ISampleAppService return aa == null; } - //[AllowAnonymous] - //public async Task KafkaAsync() - //{ + [KafkaSubscribe(["test-topic"])] - //} + public async Task KafkaSubscribeAsync(string obj) + { + _logger.LogWarning($"收到订阅消息: {obj}"); + await Task.CompletedTask; + } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs index 5f70b77..7a059e0 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs @@ -22,28 +22,30 @@ namespace JiShe.CollectBus.Kafka.Attributes /// /// 消费者组 /// - public string GroupId { get; set; } = "default"; + public string GroupId { get; set; } - public KafkaSubscribeAttribute(string[] topics) - { - this.Topics = topics; - } - - public KafkaSubscribeAttribute(string[] topics, string groupId) + public KafkaSubscribeAttribute(string[] topics, string groupId = "default") { this.Topics = topics; this.GroupId = groupId; } - public KafkaSubscribeAttribute(string[] topics, int partition) + public KafkaSubscribeAttribute(string topic, string groupId = "default") + { + this.Topics = new string[] { topic }; + this.GroupId = groupId; + } + public KafkaSubscribeAttribute(string[] topics, int partition, string groupId = "default") { this.Topics = topics; this.Partition = partition; + this.GroupId = groupId; } - public KafkaSubscribeAttribute(string[] topics, int partition, string groupId) + + public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default") { - this.Topics = topics; + this.Topics = new string[] { topic }; this.Partition = partition; this.GroupId = groupId; } diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index f0462b0..153e5ef 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -17,9 +17,9 @@ namespace JiShe.CollectBus.Kafka public override void ConfigureServices(ServiceConfigurationContext context) { // 注册Producer - context.Services.AddTransient(typeof(IProducerService<,>), typeof(ProducerService<,>)); + context.Services.AddTransient(); // 注册Consumer - context.Services.AddTransient(typeof(IConsumerService<,>), typeof(ConsumerService<,>)); + context.Services.AddTransient(); } public override void OnApplicationInitialization(ApplicationInitializationContext context) diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index a7626dd..08b8cb1 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -6,108 +6,205 @@ using JiShe.CollectBus.Kafka.Attributes; using Volo.Abp.DependencyInjection; using JiShe.CollectBus.Kafka.AdminClient; using static Confluent.Kafka.ConfigPropertyNames; +using System.Collections.Concurrent; +using System.Text.RegularExpressions; +using NUglify.Html; namespace JiShe.CollectBus.Kafka.Consumer { - public abstract class ConsumerService : IConsumerService, IDisposable + public class ConsumerService : IConsumerService, IDisposable { - private readonly ILogger> _logger; - private CancellationTokenSource _cancellationTokenSource; + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + private readonly ConcurrentDictionary + _consumerStore = new(); - protected ConsumerService(IConfiguration configuration, ILogger> logger) + public ConsumerService(IConfiguration configuration, ILogger logger) { + _configuration = configuration; _logger = logger; - GetInstance(configuration); } + #region private 私有方法 - public IConsumer Instance { get; set; } = default; - - public IConsumer GetInstance(IConfiguration configuration) + /// + /// 创建消费者 + /// + /// + /// + /// + private IConsumer CreateConsumer(string? groupId = null) where TKey : notnull where TValue : class { + var config = BuildConsumerConfig(groupId); + return new ConsumerBuilder(config) + .SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}")) + .Build(); + } - ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); - var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); - var consumerConfig = new ConsumerConfig + private ConsumerConfig BuildConsumerConfig(string? groupId = null) + { + var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!); + + var config = new ConsumerConfig { - BootstrapServers = configuration["Kafka:BootstrapServers"], + BootstrapServers = _configuration["Kafka:BootstrapServers"], + GroupId = groupId ?? "default", AutoOffsetReset = AutoOffsetReset.Earliest, - EnableAutoCommit = false, // 禁止AutoCommit - Acks = Acks.All, // 需要所有副本响应才算消费完成 + EnableAutoCommit = false // 禁止AutoCommit }; - if (enableAuthorization) + if (enableAuth) { - consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; - consumerConfig.SaslMechanism = SaslMechanism.Plain; - consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"]; - consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"]; + config.SecurityProtocol = SecurityProtocol.SaslPlaintext; + config.SaslMechanism = SaslMechanism.Plain; + config.SaslUsername = _configuration["Kafka:SaslUserName"]; + config.SaslPassword = _configuration["Kafka:SaslPassword"]; } - Instance = new ConsumerBuilder(consumerConfig).Build(); - return Instance; + + return config; + } + #endregion + + /// + /// 订阅消息 + /// + /// + /// + /// + /// + /// + public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class + { + await SubscribeAsync(new[] { topic }, messageHandler, groupId); } - public async Task SubscribeAsync(string topic, Func messageHandler) - { - _cancellationTokenSource = new CancellationTokenSource(); - Instance.Subscribe(topic); - try + /// + /// 订阅消息 + /// + /// + /// + /// + /// + public async Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class + { + await SubscribeAsync(new[] { topic }, messageHandler,groupId); + } + + /// + /// 订阅消息 + /// + /// + /// + /// + /// + /// + public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TKey : notnull where TValue : class + { + var consumerKey = typeof((TKey, TValue)); + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _ => + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + _ = Task.Run(async () => { - while (!_cancellationTokenSource.Token.IsCancellationRequested) + while (!cts.IsCancellationRequested) { - var result = Instance.Consume(_cancellationTokenSource.Token); - if (result != null) + try { - await messageHandler(result.Message.Key, result.Message.Value); + var result = consumer.Consume(cts.Token); + bool sucess= await messageHandler(result.Message.Key, result.Message.Value); + if (sucess) + { + consumer.Commit(result); // 手动提交 + } + } + catch (ConsumeException ex) + { + _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); } } - } - catch (OperationCanceledException) + }); + await Task.CompletedTask; + } + + + + /// + /// 订阅消息 + /// + /// + /// + /// + /// + /// + public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class + { + var consumerKey = typeof((Null, TValue)); + var cts = new CancellationTokenSource(); + + var consumer = _consumerStore.GetOrAdd(consumerKey, _=> + ( + CreateConsumer(groupId), + cts + )).Consumer as IConsumer; + + consumer!.Subscribe(topics); + + _ = Task.Run(async () => { - Instance.Close(); + while (!cts.IsCancellationRequested) + { + try + { + var result = consumer.Consume(cts.Token); + bool sucess = await messageHandler(result.Message.Value); + if (sucess) + consumer.Commit(result); // 手动提交 + } + catch (ConsumeException ex) + { + _logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); + } + } + }); + await Task.CompletedTask; + } + + /// + /// 取消消息订阅 + /// + /// + /// + public void Unsubscribe() where TKey : notnull where TValue : class + { + var consumerKey = typeof((TKey, TValue)); + if (_consumerStore.TryRemove(consumerKey, out var entry)) + { + entry.CTS.Cancel(); + (entry.Consumer as IDisposable)?.Dispose(); + entry.CTS.Dispose(); } } /// - /// 订阅多个topic + /// 释放资源 /// - /// - /// - /// - public async Task SubscribeAsync(string[] topics, Func messageHandler) - { - _cancellationTokenSource = new CancellationTokenSource(); - Instance.Subscribe(topics); - - try - { - while (!_cancellationTokenSource.Token.IsCancellationRequested) - { - var result = Instance.Consume(_cancellationTokenSource.Token); - if (result != null) - { - await messageHandler(result.Message.Key, result.Message.Value); - } - } - } - catch (OperationCanceledException) - { - Instance.Close(); - } - } - - public void Unsubscribe() - { - _cancellationTokenSource?.Cancel(); - Instance?.Unsubscribe(); - } - public void Dispose() { - Unsubscribe(); - Instance?.Dispose(); - _cancellationTokenSource?.Dispose(); + foreach (var entry in _consumerStore.Values) + { + entry.CTS.Cancel(); + (entry.Consumer as IDisposable)?.Dispose(); + entry.CTS.Dispose(); + } + _consumerStore.Clear(); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs index 990bc86..5cfae2c 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs @@ -6,18 +6,32 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Kafka.Consumer { - public interface IConsumerService + public interface IConsumerService { - Task SubscribeAsync(string topic, Func messageHandler); - void Unsubscribe(); - void Dispose(); + Task SubscribeAsync(string topic, Func> messageHandler, string? groupId=null) where TKey : notnull where TValue : class; /// - /// 订阅多个topic + /// 订阅消息 /// + /// + /// + /// + /// + Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class; + + Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TKey : notnull where TValue : class; + + + /// + /// 订阅消息 + /// + /// + /// /// /// /// - Task SubscribeAsync(string[] topics, Func messageHandler); + Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class; + + void Unsubscribe() where TKey : notnull where TValue : class; } } diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index fede540..6a5f5cb 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -4,6 +4,8 @@ using JiShe.CollectBus.Kafka.Consumer; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Primitives; using Newtonsoft.Json; using System; using System.Collections.Generic; @@ -11,6 +13,7 @@ using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; +using static Confluent.Kafka.ConfigPropertyNames; namespace JiShe.CollectBus.Kafka { @@ -55,14 +58,13 @@ namespace JiShe.CollectBus.Kafka /// private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider) { - var methods = subscribe.GetType().GetMethods(); - foreach (var method in methods) + var subscribedMethods = subscribe.GetType().GetMethods() + .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) + .Where(x => x.Attribute != null) + .ToArray(); + foreach (var sub in subscribedMethods) { - var attr = method.GetCustomAttribute(); - if (attr == null) continue; - - // 启动后台消费线程 - Task.Run(() => StartConsumerAsync(provider, attr, method, subscribe)); + Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe)); } } @@ -76,43 +78,54 @@ namespace JiShe.CollectBus.Kafka /// private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe) { - var consumerService = provider.GetRequiredService>(); - await consumerService.SubscribeAsync(attr.Topics, async (key, message) => + var consumerService = provider.GetRequiredService(); + var logger = provider.GetRequiredService>(); + await consumerService.SubscribeAsync(attr.Topics, async (message) => { try { - if (string.IsNullOrEmpty(message)) - await Task.CompletedTask ; - // 处理消息 - await ProcessMessageAsync(message, method, subscribe); + return await ProcessMessageAsync(message, method, subscribe); } catch (ConsumeException ex) { // 处理消费错误 - throw; + logger.LogError($"kafka消费异常:{ex.Message}"); } + return await Task.FromResult(false); }); } - private static async Task ProcessMessageAsync(string message, MethodInfo method, object subscribe) + /// + /// 处理消息 + /// + /// + /// + /// + /// + private static async Task ProcessMessageAsync(string message, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); - if (parameters.Length != 1) return; + if (parameters.Length != 1) + return true; var paramType = parameters[0].ParameterType; - var messageObj = paramType == typeof(string) - ? message - : JsonConvert.DeserializeObject(message, paramType); + var messageObj = paramType == typeof(string)? message: JsonConvert.DeserializeObject(message, paramType); if (method.ReturnType == typeof(Task)) { - await (Task)method.Invoke(subscribe, new[] { messageObj })!; + object? result = await (Task)method.Invoke(subscribe, new[] { messageObj })!; + if (result is bool success) + return success; } else { - method.Invoke(subscribe, new[] { messageObj }); + object? result = method.Invoke(subscribe, new[] { messageObj }); + if (result is bool success) + return success; } + + return false; } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs index bd9b21b..becea90 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs @@ -9,8 +9,12 @@ namespace JiShe.CollectBus.Kafka.Producer { public interface IProducerService { - Task ProduceAsync(string topic, TKey key, TValue value); - Task ProduceAsync(string topic, TValue value); - void Dispose(); + Task ProduceAsync(string topic, TKey key, TValue value) where TKey : notnull where TValue : class; + + Task ProduceAsync(string topic, TValue value) where TValue : class; + + Task ProduceAsync(string topic, TKey key, TValue value, int? partition, Action>? deliveryHandler = null) where TKey : notnull where TValue : class; + + Task ProduceAsync(string topic, TValue value, int? partition = null, Action>? deliveryHandler = null) where TValue : class; } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index 99d3dd3..ca45f8c 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; @@ -11,70 +12,180 @@ using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.Kafka.Producer { - public class ProducerService : IProducerService, IDisposable + public class ProducerService: IProducerService, IDisposable { + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + private readonly ConcurrentDictionary, object> _producerCache = new(); - private readonly ILogger> _logger; - - protected ProducerService(IConfiguration configuration, ILogger> logger) + public ProducerService(IConfiguration configuration,ILogger logger) { + _configuration = configuration; _logger = logger; - GetInstance(configuration); } - - public IProducer Instance { get; set; } = default; - - public IProducer GetInstance(IConfiguration configuration) + #region private 私有方法 + /// + /// 创建生产者实例 + /// + /// + /// + /// + private IProducer GetProducer() { - ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); - var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); - var consumerConfig = new ProducerConfig + var typeKey = Tuple.Create(typeof(TKey), typeof(TValue))!; + + return (IProducer)_producerCache.GetOrAdd(typeKey, _ => { - BootstrapServers = configuration["Kafka:BootstrapServers"], + var config = BuildProducerConfig(); + return new ProducerBuilder(config) + .SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message)) + .Build(); + }); + } + + /// + /// 配置 + /// + /// + private ProducerConfig BuildProducerConfig() + { + var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!); + + var config = new ProducerConfig + { + BootstrapServers = _configuration["Kafka:BootstrapServers"], AllowAutoCreateTopics = true, - QueueBufferingMaxKbytes = 2097151, // 修改缓冲区最大为2GB,默认为1GB + QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd - BatchSize = 32768, // 修改批次大小为32K + BatchSize = 32_768, // 修改批次大小为32K LingerMs = 20, // 修改等待时间为20ms Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader MessageSendMaxRetries = 50, // 消息发送失败最大重试50次 }; - if (enableAuthorization) + if (enableAuth) { - consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; - consumerConfig.SaslMechanism = SaslMechanism.Plain; - consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"]; - consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"]; + config.SecurityProtocol = SecurityProtocol.SaslPlaintext; + config.SaslMechanism = SaslMechanism.Plain; + config.SaslUsername = _configuration["Kafka:SaslUserName"]; + config.SaslPassword = _configuration["Kafka:SaslPassword"]; } - Instance = new ProducerBuilder(consumerConfig).Build(); - return Instance; + + return config; } - public async Task ProduceAsync(string topic, TKey key, TValue value) + + private static LogLevel ConvertLogLevel(SyslogLevel level) => level switch + { + SyslogLevel.Emergency => LogLevel.Critical, + SyslogLevel.Alert => LogLevel.Critical, + SyslogLevel.Critical => LogLevel.Critical, + SyslogLevel.Error => LogLevel.Error, + SyslogLevel.Warning => LogLevel.Warning, + SyslogLevel.Notice => LogLevel.Information, + SyslogLevel.Info => LogLevel.Information, + SyslogLevel.Debug => LogLevel.Debug, + _ => LogLevel.None + }; + + #endregion + + /// + /// 发布消息 + /// + /// + /// + /// + /// + /// + /// + public async Task ProduceAsync(string topic, TKey key, TValue value)where TKey : notnull where TValue : class + { + var producer = GetProducer(); + await producer.ProduceAsync(topic, new Message { Key = key, Value = value }); + } + + /// + /// 发布消息 + /// + /// + /// + /// + /// + public async Task ProduceAsync(string topic, TValue value) where TValue : class + { + var producer = GetProducer(); + await producer.ProduceAsync(topic, new Message { Value = value }); + } + + /// + /// 发布消息 + /// + /// + /// + /// + /// + /// + /// + /// + /// + public async Task ProduceAsync(string topic,TKey key,TValue value,int? partition=null, Action>? deliveryHandler = null)where TKey : notnull where TValue : class { var message = new Message { Key = key, Value = value }; + var producer = GetProducer(); + if (partition.HasValue) + { + var topicPartition = new TopicPartition(topic, partition.Value); + producer.Produce(topicPartition, message, deliveryHandler); + } + else + { + producer.Produce(topic, message, deliveryHandler); + } + await Task.CompletedTask; - await Instance.ProduceAsync(topic, message); } - public async Task ProduceAsync(string topic, TValue value) + /// + /// 发布消息 + /// + /// + /// + /// + /// + /// + /// + /// + public async Task ProduceAsync(string topic, TValue value, int? partition=null, Action>? deliveryHandler = null) where TValue : class { - var message = new Message + var message = new Message { Value = value }; - - await Instance.ProduceAsync(topic, message); + var producer = GetProducer(); + if (partition.HasValue) + { + var topicPartition = new TopicPartition(topic, partition.Value); + producer.Produce(topicPartition, message, deliveryHandler); + } + else + { + producer.Produce(topic, message, deliveryHandler); + } + await Task.CompletedTask; } public void Dispose() { - Instance?.Dispose(); + foreach (var producer in _producerCache.Values.OfType()) + { + producer.Dispose(); + } + _producerCache.Clear(); } } }