112 lines
4.0 KiB
C#
112 lines
4.0 KiB
C#
using Confluent.Kafka;
|
|
using Microsoft.Extensions.Configuration;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
using Confluent.Kafka.Admin;
|
|
using Microsoft.Extensions.Logging;
|
|
using Volo.Abp.DependencyInjection;
|
|
|
|
namespace JiShe.CollectBus.Kafka.AdminClient
|
|
{
|
|
public class AdminClientService : IAdminClientService, ISingletonDependency
|
|
{
|
|
|
|
private readonly ILogger<AdminClientService> _logger;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="AdminClientService"/> class.
|
|
/// </summary>
|
|
/// <param name="configuration">The configuration.</param>
|
|
/// <param name="logger">The logger.</param>
|
|
public AdminClientService(IConfiguration configuration, ILogger<AdminClientService> logger)
|
|
{
|
|
_logger = logger;
|
|
GetInstance(configuration);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets or sets the instance.
|
|
/// </summary>
|
|
/// <value>
|
|
/// The instance.
|
|
/// </value>
|
|
public IAdminClient Instance { get; set; } = default;
|
|
|
|
/// <summary>
|
|
/// Gets the instance.
|
|
/// </summary>
|
|
/// <param name="configuration">The configuration.</param>
|
|
/// <returns></returns>
|
|
public IAdminClient GetInstance(IConfiguration configuration)
|
|
{
|
|
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
|
var adminClientConfig = new AdminClientConfig()
|
|
{
|
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
|
};
|
|
if (enableAuthorization)
|
|
{
|
|
adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
|
|
adminClientConfig.SaslMechanism = SaslMechanism.Plain;
|
|
adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"];
|
|
adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"];
|
|
}
|
|
Instance = new AdminClientBuilder(adminClientConfig).Build();
|
|
return Instance;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Checks the topic asynchronous.
|
|
/// </summary>
|
|
/// <param name="topic">The topic.</param>
|
|
/// <returns></returns>
|
|
public async Task<bool> CheckTopicAsync(string topic)
|
|
{
|
|
var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5));
|
|
return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates the topic if not exist asynchronous.
|
|
/// </summary>
|
|
/// <param name="topicName">Name of the topic.</param>
|
|
/// <param name="factorNum">The factor number.</param>
|
|
/// <param name="partitionNum">The partition number.</param>
|
|
public async Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum)
|
|
{
|
|
try
|
|
{
|
|
if (await CheckTopicAsync(topicName)) return;
|
|
|
|
await Instance.CreateTopicsAsync(new[]
|
|
{
|
|
new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }
|
|
});
|
|
}
|
|
catch (CreateTopicsException e)
|
|
{
|
|
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Deletes the topic asynchronous.
|
|
/// </summary>
|
|
/// <param name="topicName">Name of the topic.</param>
|
|
public async Task DeleteTopicAsync(List<string> topicName)
|
|
{
|
|
try
|
|
{
|
|
await Instance.DeleteTopicsAsync(topicName, null);
|
|
}
|
|
catch (DeleteTopicsException e)
|
|
{
|
|
_logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
|
|
}
|
|
}
|
|
}
|
|
}
|