2025-04-14 09:29:12 +08:00

74 lines
2.3 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer
{
public class ProducerService<TKey, TValue> : IProducerService<TKey, TValue>, IDisposable,ITransientDependency
{
private readonly ILogger<ProducerService<TKey, TValue>> _logger;
protected ProducerService(IConfiguration configuration, ILogger<ProducerService<TKey, TValue>> logger)
{
_logger = logger;
GetInstance(configuration);
}
public IProducer<TKey, TValue> Instance { get; set; } = default;
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
{
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
var consumerConfig = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
AllowAutoCreateTopics = true
};
if (enableAuthorization)
{
consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
consumerConfig.SaslMechanism = SaslMechanism.Plain;
consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"];
consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"];
}
Instance = new ProducerBuilder<TKey, TValue>(consumerConfig).Build();
return Instance;
}
public async Task ProduceAsync(string topic, TKey key, TValue value)
{
var message = new Message<TKey, TValue>
{
Key = key,
Value = value
};
await Instance.ProduceAsync(topic, message);
}
public async Task ProduceAsync(string topic, TValue value)
{
var message = new Message<TKey, TValue>
{
Value = value
};
await Instance.ProduceAsync(topic, message);
}
public void Dispose()
{
Instance?.Dispose();
}
}
}