using Confluent.Kafka; using Microsoft.Extensions.Configuration; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Confluent.Kafka.Admin; using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.Kafka.AdminClient { public class AdminClientService : IAdminClientService, IDisposable,ISingletonDependency { private readonly ILogger _logger; /// /// Initializes a new instance of the class. /// /// The configuration. /// The logger. public AdminClientService(IConfiguration configuration, ILogger logger) { _logger = logger; GetInstance(configuration); } /// /// Gets or sets the instance. /// /// /// The instance. /// public IAdminClient Instance { get; set; } = default; /// /// Gets the instance. /// /// The configuration. /// public IAdminClient GetInstance(IConfiguration configuration) { var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]); var adminClientConfig = new AdminClientConfig() { BootstrapServers = configuration["Kafka:BootstrapServers"], }; if (enableAuthorization) { adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; adminClientConfig.SaslMechanism = SaslMechanism.Plain; adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"]; adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"]; } Instance = new AdminClientBuilder(adminClientConfig).Build(); return Instance; } /// /// Checks the topic asynchronous. /// /// The topic. /// public async Task CheckTopicAsync(string topic) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); } public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) { try { await Instance.CreateTopicsAsync(new[] { new TopicSpecification { Name = topic, NumPartitions = numPartitions, ReplicationFactor = replicationFactor } }); } catch (CreateTopicsException e) { if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) { throw; } } } public async Task DeleteTopicAsync(string topic) { await Instance.DeleteTopicsAsync(new[] { topic }); } public async Task> ListTopicsAsync() { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); return new List(metadata.Topics.Select(t => t.Topic)); } public async Task TopicExistsAsync(string topic) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); return metadata.Topics.Any(t => t.Topic == topic); } public void Dispose() { Instance?.Dispose(); } } }