using Confluent.Kafka; using Microsoft.Extensions.Configuration; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Confluent.Kafka.Admin; using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.Kafka.AdminClient { public class AdminClientService : IAdminClientService, IDisposable,ISingletonDependency { private readonly ILogger _logger; /// /// Initializes a new instance of the class. /// /// The configuration. /// The logger. public AdminClientService(IConfiguration configuration, ILogger logger) { _logger = logger; GetInstance(configuration); } /// /// Gets or sets the instance. /// /// /// The instance. /// public IAdminClient Instance { get; set; } = default; /// /// Gets the instance. /// /// The configuration. /// public IAdminClient GetInstance(IConfiguration configuration) { ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); var adminClientConfig = new AdminClientConfig() { BootstrapServers = configuration["Kafka:BootstrapServers"], }; if (enableAuthorization) { adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; adminClientConfig.SaslMechanism = SaslMechanism.Plain; adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"]; adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"]; } Instance = new AdminClientBuilder(adminClientConfig).Build(); return Instance; } /// /// Checks the topic asynchronous. /// /// The topic. /// public async Task CheckTopicAsync(string topic) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); } /// /// 判断Kafka主题是否存在 /// /// 主题名称 /// 副本数量,不能高于Brokers数量 /// public async Task CheckTopicAsync(string topic,int numPartitions) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); if(numPartitions > metadata.Brokers.Count) { throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。") ; } return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); } //// /// 创建Kafka主题 /// /// 主题名称 /// 主题分区数量 /// 副本数量,不能高于Brokers数量 /// public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) { try { if (await CheckTopicAsync(topic)) return; await Instance.CreateTopicsAsync(new[] { new TopicSpecification { Name = topic, NumPartitions = numPartitions, ReplicationFactor = replicationFactor } }); } catch (CreateTopicsException e) { if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) { throw; } } } /// /// 删除Kafka主题 /// /// /// public async Task DeleteTopicAsync(string topic) { await Instance.DeleteTopicsAsync(new[] { topic }); } /// /// 获取Kafka主题列表 /// /// public async Task> ListTopicsAsync() { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); return new List(metadata.Topics.Select(t => t.Topic)); } /// /// 判断Kafka主题是否存在 /// /// /// public async Task TopicExistsAsync(string topic) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); return metadata.Topics.Any(t => t.Topic == topic); } /// /// 检测分区是否存在 /// /// /// /// public Dictionary CheckPartitionsExists(string topic, int[] partitions) { var result = new Dictionary(); var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); if (metadata.Topics.Count == 0) return partitions.ToDictionary(p => p, p => false); var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet(); return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p)); } /// /// 检测分区是否存在 /// /// /// /// public bool CheckPartitionsExist(string topic, int targetPartition) { var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); if (metadata.Topics.Count == 0) return false; var partitions = metadata.Topics[0].Partitions; return partitions.Any(p => p.PartitionId == targetPartition); } /// /// 获取主题的分区数量 /// /// /// public int GetTopicPartitionsNum(string topic) { var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); if (metadata.Topics.Count == 0) return 0; return metadata.Topics[0].Partitions.Count; } public void Dispose() { Instance?.Dispose(); } } }