using Confluent.Kafka; using Confluent.Kafka.Admin; using JiShe.CollectBus.Kafka.Internal; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.Kafka.AdminClient; public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency { private readonly ILogger _logger; private readonly KafkaOptionConfig _kafkaOptionConfig; /// /// Initializes a new instance of the class. /// /// /// public AdminClientService(IConfiguration configuration, ILogger logger, IOptions kafkaOptionConfig) { _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; Instance = GetInstance(); } /// /// Gets or sets the instance. /// /// /// The instance. /// public IAdminClient Instance { get; set; } /// /// 创建Kafka主题 /// /// /// /// /// public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) { try { if (await CheckTopicAsync(topic)) return; await Instance.CreateTopicsAsync(new[] { new TopicSpecification { Name = topic, NumPartitions = numPartitions, ReplicationFactor = replicationFactor } }); } catch (CreateTopicsException e) { if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) throw; } } /// /// 删除Kafka主题 /// /// /// public async Task DeleteTopicAsync(string topic) { await Instance.DeleteTopicsAsync(new[] { topic }); } /// /// 获取Kafka主题列表 /// /// public async Task> ListTopicsAsync() { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); return new List(metadata.Topics.Select(t => t.Topic)); } /// /// 判断Kafka主题是否存在 /// /// /// public async Task TopicExistsAsync(string topic) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); return metadata.Topics.Any(t => t.Topic == topic); } /// /// 检测分区是否存在 /// /// /// /// public Dictionary CheckPartitionsExists(string topic, int[] partitions) { var result = new Dictionary(); var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); if (metadata.Topics.Count == 0) return partitions.ToDictionary(p => p, p => false); var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet(); return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p)); } /// /// 检测分区是否存在 /// /// /// /// public bool CheckPartitionsExist(string topic, int targetPartition) { var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); if (metadata.Topics.Count == 0) return false; var partitions = metadata.Topics[0].Partitions; return partitions.Any(p => p.PartitionId == targetPartition); } /// /// 获取主题的分区数量 /// /// /// public int GetTopicPartitionsNum(string topic) { var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); if (metadata.Topics.Count == 0) return 0; return metadata.Topics[0].Partitions.Count; } public void Dispose() { Instance?.Dispose(); } /// /// Gets the instance. /// /// public IAdminClient GetInstance() { var adminClientConfig = new AdminClientConfig { BootstrapServers = _kafkaOptionConfig.BootstrapServers }; if (_kafkaOptionConfig.EnableAuthorization) { adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; adminClientConfig.SaslMechanism = SaslMechanism.Plain; adminClientConfig.SaslUsername = _kafkaOptionConfig.SaslUserName; adminClientConfig.SaslPassword = _kafkaOptionConfig.SaslUserName; } return new AdminClientBuilder(adminClientConfig).Build(); } /// /// Checks the topic asynchronous. /// /// The topic. /// public async Task CheckTopicAsync(string topic) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); } /// /// 判断Kafka主题是否存在 /// /// 主题名称 /// 副本数量,不能高于Brokers数量 /// public async Task CheckTopicAsync(string topic, int numPartitions) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); if (numPartitions > metadata.Brokers.Count) throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。"); return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); } }