优化kafka发布订阅

This commit is contained in:
zenghongyao 2025-04-15 15:49:22 +08:00
parent 6409afa98e
commit be076a81e5
8 changed files with 390 additions and 144 deletions

View File

@ -17,10 +17,13 @@ using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IotSystems.AFNEntity;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Kafka.Attributes;
using System.Text.Json;
using JiShe.CollectBus.Kafka;
namespace JiShe.CollectBus.Samples;
public class SampleAppService : CollectBusAppService, ISampleAppService
public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaSubscribe
{
private readonly ILogger<SampleAppService> _logger;
private readonly IIoTDBProvider _iotDBProvider;
@ -181,9 +184,11 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
return aa == null;
}
//[AllowAnonymous]
//public async Task KafkaAsync()
//{
[KafkaSubscribe(["test-topic"])]
//}
public async Task KafkaSubscribeAsync(string obj)
{
_logger.LogWarning($"收到订阅消息: {obj}");
await Task.CompletedTask;
}
}

View File

@ -22,30 +22,32 @@ namespace JiShe.CollectBus.Kafka.Attributes
/// <summary>
/// 消费者组
/// </summary>
public string GroupId { get; set; } = "default";
public string GroupId { get; set; }
public KafkaSubscribeAttribute(string[] topics)
{
this.Topics = topics;
}
public KafkaSubscribeAttribute(string[] topics, string groupId)
public KafkaSubscribeAttribute(string[] topics, string groupId = "default")
{
this.Topics = topics;
this.GroupId = groupId;
}
public KafkaSubscribeAttribute(string[] topics, int partition)
public KafkaSubscribeAttribute(string topic, string groupId = "default")
{
this.Topics = topics;
this.Partition = partition;
this.Topics = new string[] { topic };
this.GroupId = groupId;
}
public KafkaSubscribeAttribute(string[] topics, int partition, string groupId)
public KafkaSubscribeAttribute(string[] topics, int partition, string groupId = "default")
{
this.Topics = topics;
this.Partition = partition;
this.GroupId = groupId;
}
public KafkaSubscribeAttribute(string topic, int partition, string groupId = "default")
{
this.Topics = new string[] { topic };
this.Partition = partition;
this.GroupId = groupId;
}
}
}

View File

@ -17,9 +17,9 @@ namespace JiShe.CollectBus.Kafka
public override void ConfigureServices(ServiceConfigurationContext context)
{
// 注册Producer
context.Services.AddTransient(typeof(IProducerService<,>), typeof(ProducerService<,>));
context.Services.AddTransient<IProducerService, ProducerService>();
// 注册Consumer
context.Services.AddTransient(typeof(IConsumerService<,>), typeof(ConsumerService<,>));
context.Services.AddTransient<IConsumerService, ConsumerService>();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)

View File

@ -6,108 +6,205 @@ using JiShe.CollectBus.Kafka.Attributes;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Kafka.AdminClient;
using static Confluent.Kafka.ConfigPropertyNames;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using NUglify.Html;
namespace JiShe.CollectBus.Kafka.Consumer
{
public abstract class ConsumerService<TKey, TValue> : IConsumerService<TKey, TValue>, IDisposable
public class ConsumerService : IConsumerService, IDisposable
{
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
private CancellationTokenSource _cancellationTokenSource;
private readonly ILogger<ConsumerService> _logger;
private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
_consumerStore = new();
protected ConsumerService(IConfiguration configuration, ILogger<ConsumerService<TKey, TValue>> logger)
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
{
_configuration = configuration;
_logger = logger;
GetInstance(configuration);
}
#region private
public IConsumer<TKey, TValue> Instance { get; set; } = default;
public IConsumer<TKey, TValue> GetInstance(IConfiguration configuration)
/// <summary>
/// 创建消费者
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
private IConsumer<TKey, TValue> CreateConsumer<TKey, TValue>(string? groupId = null) where TKey : notnull where TValue : class
{
var config = BuildConsumerConfig(groupId);
return new ConsumerBuilder<TKey, TValue>(config)
.SetErrorHandler((_, e) => _logger.LogError($"消费者错误: {e.Reason}"))
.Build();
}
ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
var consumerConfig = new ConsumerConfig
private ConsumerConfig BuildConsumerConfig(string? groupId = null)
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
var config = new ConsumerConfig
{
BootstrapServers = _configuration["Kafka:BootstrapServers"],
GroupId = groupId ?? "default",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
Acks = Acks.All, // 需要所有副本响应才算消费完成
EnableAutoCommit = false // 禁止AutoCommit
};
if (enableAuthorization)
if (enableAuth)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ConsumerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
config.SaslMechanism = SaslMechanism.Plain;
config.SaslUsername = _configuration["Kafka:SaslUserName"];
config.SaslPassword = _configuration["Kafka:SaslPassword"];
}
public async Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler)
{
_cancellationTokenSource = new CancellationTokenSource();
Instance.Subscribe(topic);
return config;
}
#endregion
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string topic, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
await SubscribeAsync<TKey, TValue>(new[] { topic }, messageHandler, groupId);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class
{
await SubscribeAsync<TValue>(new[] { topic }, messageHandler,groupId);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId = null) where TKey : notnull where TValue : class
{
var consumerKey = typeof((TKey, TValue));
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
(
CreateConsumer<TKey, TValue>(groupId),
cts
)).Consumer as IConsumer<TKey, TValue>;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
var result = consumer.Consume(cts.Token);
bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
if (sucess)
{
var result = Instance.Consume(_cancellationTokenSource.Token);
if (result != null)
{
await messageHandler(result.Message.Key, result.Message.Value);
consumer.Commit(result); // 手动提交
}
}
}
catch (OperationCanceledException)
catch (ConsumeException ex)
{
Instance.Close();
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
}
});
await Task.CompletedTask;
}
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{
var consumerKey = typeof((Null, TValue));
var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
(
CreateConsumer<Null, TValue>(groupId),
cts
)).Consumer as IConsumer<Null, TValue>;
consumer!.Subscribe(topics);
_ = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
try
{
var result = consumer.Consume(cts.Token);
bool sucess = await messageHandler(result.Message.Value);
if (sucess)
consumer.Commit(result); // 手动提交
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
}
}
});
await Task.CompletedTask;
}
/// <summary>
/// 取消消息订阅
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
public void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class
{
var consumerKey = typeof((TKey, TValue));
if (_consumerStore.TryRemove(consumerKey, out var entry))
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
}
}
/// <summary>
/// 订阅多个topic
/// 释放资源
/// </summary>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
public async Task SubscribeAsync(string[] topics, Func<TKey, TValue, Task> messageHandler)
{
_cancellationTokenSource = new CancellationTokenSource();
Instance.Subscribe(topics);
try
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
var result = Instance.Consume(_cancellationTokenSource.Token);
if (result != null)
{
await messageHandler(result.Message.Key, result.Message.Value);
}
}
}
catch (OperationCanceledException)
{
Instance.Close();
}
}
public void Unsubscribe()
{
_cancellationTokenSource?.Cancel();
Instance?.Unsubscribe();
}
public void Dispose()
{
Unsubscribe();
Instance?.Dispose();
_cancellationTokenSource?.Dispose();
foreach (var entry in _consumerStore.Values)
{
entry.CTS.Cancel();
(entry.Consumer as IDisposable)?.Dispose();
entry.CTS.Dispose();
}
_consumerStore.Clear();
}
}
}

View File

@ -6,18 +6,32 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Consumer
{
public interface IConsumerService<TKey, TValue>
public interface IConsumerService
{
Task SubscribeAsync(string topic, Func<TKey, TValue, Task> messageHandler);
void Unsubscribe();
void Dispose();
Task SubscribeAsync<TKey, TValue>(string topic, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId=null) where TKey : notnull where TValue : class;
/// <summary>
/// 订阅多个topic
/// 订阅消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
Task SubscribeAsync<TValue>(string topic, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
Task SubscribeAsync<TKey, TValue>(string[] topics, Func<TKey, TValue, Task<bool>> messageHandler, string? groupId) where TKey : notnull where TValue : class;
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topics"></param>
/// <param name="messageHandler"></param>
/// <returns></returns>
Task SubscribeAsync(string[] topics, Func<TKey, TValue, Task> messageHandler);
Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
}
}

View File

@ -4,6 +4,8 @@ using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
@ -11,6 +13,7 @@ using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka
{
@ -55,14 +58,13 @@ namespace JiShe.CollectBus.Kafka
/// <param name="provider"></param>
private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider)
{
var methods = subscribe.GetType().GetMethods();
foreach (var method in methods)
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
.Where(x => x.Attribute != null)
.ToArray();
foreach (var sub in subscribedMethods)
{
var attr = method.GetCustomAttribute<KafkaSubscribeAttribute>();
if (attr == null) continue;
// 启动后台消费线程
Task.Run(() => StartConsumerAsync(provider, attr, method, subscribe));
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe));
}
}
@ -76,43 +78,54 @@ namespace JiShe.CollectBus.Kafka
/// <returns></returns>
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe)
{
var consumerService = provider.GetRequiredService<IConsumerService<Ignore, string>>();
await consumerService.SubscribeAsync(attr.Topics, async (key, message) =>
var consumerService = provider.GetRequiredService<IConsumerService>();
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
await consumerService.SubscribeAsync<string>(attr.Topics, async (message) =>
{
try
{
if (string.IsNullOrEmpty(message))
await Task.CompletedTask ;
// 处理消息
await ProcessMessageAsync(message, method, subscribe);
return await ProcessMessageAsync(message, method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
throw;
logger.LogError($"kafka消费异常:{ex.Message}");
}
return await Task.FromResult(false);
});
}
private static async Task ProcessMessageAsync(string message, MethodInfo method, object subscribe)
/// <summary>
/// 处理消息
/// </summary>
/// <param name="message"></param>
/// <param name="method"></param>
/// <param name="subscribe"></param>
/// <returns></returns>
private static async Task<bool> ProcessMessageAsync(string message, MethodInfo method, object subscribe)
{
var parameters = method.GetParameters();
if (parameters.Length != 1) return;
if (parameters.Length != 1)
return true;
var paramType = parameters[0].ParameterType;
var messageObj = paramType == typeof(string)
? message
: JsonConvert.DeserializeObject(message, paramType);
var messageObj = paramType == typeof(string)? message: JsonConvert.DeserializeObject(message, paramType);
if (method.ReturnType == typeof(Task))
{
await (Task)method.Invoke(subscribe, new[] { messageObj })!;
object? result = await (Task<bool>)method.Invoke(subscribe, new[] { messageObj })!;
if (result is bool success)
return success;
}
else
{
method.Invoke(subscribe, new[] { messageObj });
object? result = method.Invoke(subscribe, new[] { messageObj });
if (result is bool success)
return success;
}
return false;
}
}

View File

@ -9,8 +9,12 @@ namespace JiShe.CollectBus.Kafka.Producer
{
public interface IProducerService
{
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value);
Task ProduceAsync<TKey,TValue>(string topic, TValue value);
void Dispose();
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class;
Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value, int? partition, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null) where TKey : notnull where TValue : class;
Task ProduceAsync<TValue>(string topic, TValue value, int? partition = null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class;
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -11,70 +12,180 @@ using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer
{
public class ProducerService<TKey, TValue> : IProducerService<TKey, TValue>, IDisposable
public class ProducerService: IProducerService, IDisposable
{
private readonly ILogger<ProducerService> _logger;
private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Tuple<Type, Type>, object> _producerCache = new();
private readonly ILogger<ProducerService<TKey, TValue>> _logger;
protected ProducerService(IConfiguration configuration, ILogger<ProducerService<TKey, TValue>> logger)
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger)
{
_configuration = configuration;
_logger = logger;
GetInstance(configuration);
}
public IProducer<TKey, TValue> Instance { get; set; } = default;
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
#region private
/// <summary>
/// 创建生产者实例
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <returns></returns>
private IProducer<TKey, TValue> GetProducer<TKey, TValue>()
{
ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
var consumerConfig = new ProducerConfig
var typeKey = Tuple.Create(typeof(TKey), typeof(TValue))!;
return (IProducer<TKey, TValue>)_producerCache.GetOrAdd(typeKey, _ =>
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
var config = BuildProducerConfig();
return new ProducerBuilder<TKey, TValue>(config)
.SetLogHandler((_, msg) => _logger.Log(ConvertLogLevel(msg.Level), msg.Message))
.Build();
});
}
/// <summary>
/// 配置
/// </summary>
/// <returns></returns>
private ProducerConfig BuildProducerConfig()
{
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
var config = new ProducerConfig
{
BootstrapServers = _configuration["Kafka:BootstrapServers"],
AllowAutoCreateTopics = true,
QueueBufferingMaxKbytes = 2097151, // 修改缓冲区最大为2GB默认为1GB
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
BatchSize = 32768, // 修改批次大小为32K
BatchSize = 32_768, // 修改批次大小为32K
LingerMs = 20, // 修改等待时间为20ms
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
};
if (enableAuthorization)
if (enableAuth)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
config.SaslMechanism = SaslMechanism.Plain;
config.SaslUsername = _configuration["Kafka:SaslUserName"];
config.SaslPassword = _configuration["Kafka:SaslPassword"];
}
Instance = new ProducerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
return config;
}
public async Task ProduceAsync(string topic, TKey key, TValue value)
private static LogLevel ConvertLogLevel(SyslogLevel level) => level switch
{
SyslogLevel.Emergency => LogLevel.Critical,
SyslogLevel.Alert => LogLevel.Critical,
SyslogLevel.Critical => LogLevel.Critical,
SyslogLevel.Error => LogLevel.Error,
SyslogLevel.Warning => LogLevel.Warning,
SyslogLevel.Notice => LogLevel.Information,
SyslogLevel.Info => LogLevel.Information,
SyslogLevel.Debug => LogLevel.Debug,
_ => LogLevel.None
};
#endregion
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task ProduceAsync<TKey, TValue>(string topic, TKey key, TValue value)where TKey : notnull where TValue : class
{
var producer = GetProducer<TKey, TValue>();
await producer.ProduceAsync(topic, new Message<TKey, TValue> { Key = key, Value = value });
}
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value) where TValue : class
{
var producer = GetProducer<Null, TValue>();
await producer.ProduceAsync(topic, new Message<Null, TValue> { Value = value });
}
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="partition"></param>
/// <param name="deliveryHandler"></param>
/// <returns></returns>
public async Task ProduceAsync<TKey, TValue>(string topic,TKey key,TValue value,int? partition=null, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)where TKey : notnull where TValue : class
{
var message = new Message<TKey, TValue>
{
Key = key,
Value = value
};
var producer = GetProducer<TKey, TValue>();
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
producer.Produce(topicPartition, message, deliveryHandler);
}
else
{
producer.Produce(topic, message, deliveryHandler);
}
await Task.CompletedTask;
await Instance.ProduceAsync(topic, message);
}
public async Task ProduceAsync(string topic, TValue value)
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="TValue"></typeparam>
/// <param name="topic"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="partition"></param>
/// <param name="deliveryHandler"></param>
/// <returns></returns>
public async Task ProduceAsync<TValue>(string topic, TValue value, int? partition=null, Action<DeliveryReport<Null, TValue>>? deliveryHandler = null) where TValue : class
{
var message = new Message<TKey, TValue>
var message = new Message<Null, TValue>
{
Value = value
};
await Instance.ProduceAsync(topic, message);
var producer = GetProducer<Null, TValue>();
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
producer.Produce(topicPartition, message, deliveryHandler);
}
else
{
producer.Produce(topic, message, deliveryHandler);
}
await Task.CompletedTask;
}
public void Dispose()
{
Instance?.Dispose();
foreach (var producer in _producerCache.Values.OfType<IDisposable>())
{
producer.Dispose();
}
_producerCache.Clear();
}
}
}