From 0c06cc904da63c073d0be078b1de7d61ee477040 Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Sat, 12 Apr 2025 15:11:18 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E4=BB=A3=E7=A0=81=E8=BF=98=E5=8E=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/JiShe.CollectBus.Host/appsettings.json | 65 +++++++-------- .../AdminClient/AdminClientService.cs | 56 +++++++------ .../AdminClient/IAdminClientService.cs | 26 +----- .../CollectBusKafkaModule.cs | 10 --- .../Consumer/ConsumerBackgroundService.cs | 54 ------------- .../Consumer/ConsumerService.cs | 79 ++++++++++++------- .../Consumer/IConsumerService.cs | 5 +- .../KafkaOptions.cs | 17 ++++ .../Producer/IProducerService.cs | 6 +- .../Producer/ProducerBaseService.cs | 64 --------------- .../Producer/ProducerService.cs | 60 +++++++++++--- 11 files changed, 195 insertions(+), 247 deletions(-) delete mode 100644 src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerBackgroundService.cs create mode 100644 src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs delete mode 100644 src/JiShe.CollectBus.KafkaProducer/Producer/ProducerBaseService.cs diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 2c72f7d..2cb4f66 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -81,43 +81,44 @@ "Port": 5672 } }, + "Kafka": { + "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", + "EnableAuthorization": false, + "SecurityProtocol": "SASL_PLAINTEXT", + "SaslMechanism": "PLAIN", + "SaslUserName": "lixiao", + "SaslPassword": "lixiao1980" + //"Topic": { + // "ReplicationFactor": 3, + // "NumPartitions": 1000 + //} + }, //"Kafka": { - // "BootstrapServers": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092", - // "EnableAuthorization": false, - // "SecurityProtocol": "SASL_PLAINTEXT", - // "SaslMechanism": "PLAIN", - // "SaslUserName": "lixiao", - // "SaslPassword": "lixiao1980", + // "Connections": { + // "Default": { + // "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092" + // // "SecurityProtocol": "SASL_PLAINTEXT", + // // "SaslMechanism": "PLAIN", + // // "SaslUserName": "lixiao", + // // "SaslPassword": "lixiao1980", + // } + // }, + // "Consumer": { + // "GroupId": "JiShe.CollectBus" + // }, + // "Producer": { + // "MessageTimeoutMs": 6000, + // "Acks": -1 + // }, // "Topic": { // "ReplicationFactor": 3, // "NumPartitions": 1000 + // }, + // "EventBus": { + // "GroupId": "JiShe.CollectBus", + // "TopicName": "DefaultTopicName" // } - "Kafka": { - "Connections": { - "Default": { - "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092" - // "SecurityProtocol": "SASL_PLAINTEXT", - // "SaslMechanism": "PLAIN", - // "SaslUserName": "lixiao", - // "SaslPassword": "lixiao1980", - } - }, - "Consumer": { - "GroupId": "JiShe.CollectBus" - }, - "Producer": { - "MessageTimeoutMs": 6000, - "Acks": -1 - }, - "Topic": { - "ReplicationFactor": 3, - "NumPartitions": 1000 - }, - "EventBus": { - "GroupId": "JiShe.CollectBus", - "TopicName": "DefaultTopicName" - } - }, + //}, "IoTDBOptions": { "UserName": "root", "Password": "root", diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs index e4b621b..e687276 100644 --- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs @@ -11,7 +11,7 @@ using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.Kafka.AdminClient { - public class AdminClientService : IAdminClientService, ISingletonDependency + public class AdminClientService : IAdminClientService, IDisposable,ISingletonDependency { private readonly ILogger _logger; @@ -69,43 +69,49 @@ namespace JiShe.CollectBus.Kafka.AdminClient return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); } - /// - /// Creates the topic if not exist asynchronous. - /// - /// Name of the topic. - /// The factor number. - /// The partition number. - public async Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum) + public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) { try { - if (await CheckTopicAsync(topicName)) return; - await Instance.CreateTopicsAsync(new[] { - new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum } + new TopicSpecification + { + Name = topic, + NumPartitions = numPartitions, + ReplicationFactor = replicationFactor + } }); } catch (CreateTopicsException e) { - _logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) + { + throw; + } } } - /// - /// Deletes the topic asynchronous. - /// - /// Name of the topic. - public async Task DeleteTopicAsync(List topicName) + public async Task DeleteTopicAsync(string topic) { - try - { - await Instance.DeleteTopicsAsync(topicName, null); - } - catch (DeleteTopicsException e) - { - _logger.LogError($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); - } + await Instance.DeleteTopicsAsync(new[] { topic }); + } + + public async Task> ListTopicsAsync() + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); + return new List(metadata.Topics.Select(t => t.Topic)); + } + + public async Task TopicExistsAsync(string topic) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); + return metadata.Topics.Any(t => t.Topic == topic); + } + + public void Dispose() + { + Instance?.Dispose(); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs index bdb2d07..c3d332d 100644 --- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs @@ -8,27 +8,9 @@ namespace JiShe.CollectBus.Kafka.AdminClient { public interface IAdminClientService { - /// - /// Checks the topic asynchronous. - /// - /// The topic. - /// - Task CheckTopicAsync(string topic); - - /// - /// Creates the topic if not exist asynchronous. - /// - /// Name of the topic. - /// The factor number. - /// The partition number. - /// - Task CreateTopicIfNotExistAsync(string topicName, short factorNum, int partitionNum); - - /// - /// Deletes the topic asynchronous. - /// - /// Name of the topic. - /// - Task DeleteTopicAsync(List topicName); + Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor); + Task DeleteTopicAsync(string topic); + Task> ListTopicsAsync(); + Task TopicExistsAsync(string topic); } } diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index 44ddcee..61cc788 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -10,16 +10,6 @@ namespace JiShe.CollectBus.Kafka { public override void ConfigureServices(ServiceConfigurationContext context) { - var configuration = context.Services.GetConfiguration(); - - // 注册 Kafka 生产者 - context.Services.AddSingleton>(sp => - new ProducerBuilder( - configuration.GetSection("Kafka:ProducerConfig").Get() - ) - .Build()); - // 注册后台服务 - context.Services.AddHostedService(); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerBackgroundService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerBackgroundService.cs deleted file mode 100644 index e49a09e..0000000 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerBackgroundService.cs +++ /dev/null @@ -1,54 +0,0 @@ -using Confluent.Kafka; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Kafka.Consumer -{ - public class ConsumerBackgroundService : BackgroundService - { - private readonly ConsumerService _consumerService; - private readonly ILogger _logger; - - public ConsumerBackgroundService( - ConsumerService consumerService, - ILogger logger) - { - _consumerService = consumerService; - _logger = logger; - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - _consumerService.Subscribe("abp-kafka-topic"); - - while (!stoppingToken.IsCancellationRequested) - { - try - { - var result = _consumerService.Consume(stoppingToken); - await ProcessMessageAsync(result.Message.Value); - _consumerService.Commit(result); - } - catch (ConsumeException ex) - { - _logger.LogError(ex, $"Message consume error: {ex.Error.Reason}"); - } - } - } - - private async Task ProcessMessageAsync(string message) - { - // 使用 ABP 的异步处理机制 - await Task.Run(() => - { - _logger.LogInformation($"Processing message: {message}"); - // 这里可以触发 ABP 的领域事件 - }); - } - } -} diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index 391a7fc..37efe3a 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -5,54 +5,77 @@ using Microsoft.Extensions.Logging; using JiShe.CollectBus.Kafka.Attributes; using Volo.Abp.DependencyInjection; using JiShe.CollectBus.Kafka.AdminClient; +using static Confluent.Kafka.ConfigPropertyNames; namespace JiShe.CollectBus.Kafka.Consumer { - public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency + public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency { - private readonly ILogger _logger; - private readonly IConsumer _consumer; + private readonly ILogger> _logger; + private CancellationTokenSource _cancellationTokenSource; - public ConsumerService( - ILogger logger, - IConfiguration configuration) + protected ConsumerService(IConfiguration configuration, ILogger> logger) { _logger = logger; - - var consumerConfig = configuration.GetSection("Kafka:ConsumerConfig") - .Get(); - - _consumer = new ConsumerBuilder(consumerConfig) - .SetErrorHandler(OnConsumeError) - .Build(); + GetInstance(configuration); } - public void Subscribe(string topic) + + public IConsumer Instance { get; set; } = default; + + public IConsumer GetInstance(IConfiguration configuration) { - _consumer.Subscribe(topic); - _logger.LogInformation($"Subscribed to topic: {topic}"); + var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]); + var consumerConfig = new ConsumerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"], + AutoOffsetReset = AutoOffsetReset.Earliest + }; + + if (enableAuthorization) + { + consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; + consumerConfig.SaslMechanism = SaslMechanism.Plain; + consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"]; + consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"]; + } + Instance = new ConsumerBuilder(consumerConfig).Build(); + return Instance; } - public ConsumeResult Consume(CancellationToken cancellationToken) + public async Task SubscribeAsync(string topic, Func messageHandler) { - return _consumer.Consume(cancellationToken); + _cancellationTokenSource = new CancellationTokenSource(); + Instance.Subscribe(topic); + + try + { + while (!_cancellationTokenSource.Token.IsCancellationRequested) + { + var result = Instance.Consume(_cancellationTokenSource.Token); + if (result != null) + { + await messageHandler(result.Message.Key, result.Message.Value); + } + } + } + catch (OperationCanceledException) + { + Instance.Close(); + } } - public void Commit(ConsumeResult result) + public void Unsubscribe() { - _consumer.Commit(result); - _logger.LogDebug($"Committed offset: {result.TopicPartitionOffset}"); - } - - private void OnConsumeError(IConsumer consumer, Error error) - { - _logger.LogError($"Kafka consumer error: {error.Reason}"); + _cancellationTokenSource?.Cancel(); + Instance?.Unsubscribe(); } public void Dispose() { - _consumer?.Close(); - _consumer?.Dispose(); + Unsubscribe(); + Instance?.Dispose(); + _cancellationTokenSource?.Dispose(); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs index 1319957..bb88038 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs @@ -6,7 +6,10 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Kafka.Consumer { - public interface IConsumerService + public interface IConsumerService { + Task SubscribeAsync(string topic, Func messageHandler); + void Unsubscribe(); + void Dispose(); } } diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs new file mode 100644 index 0000000..d946cc8 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka +{ + public class KafkaOptions + { + public string BootstrapServers { get; set; } + public string GroupId { get; set; } + public Dictionary ProducerConfig { get; set; } = new(); + public Dictionary ConsumerConfig { get; set; } = new(); + public Dictionary AdminConfig { get; set; } = new(); + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs index 2c46bf4..2ceaed5 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/IProducerService.cs @@ -7,8 +7,10 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Kafka.Producer { - public interface IProducerService + public interface IProducerService { - Task ProduceAsync(string topic, string message); + Task ProduceAsync(string topic, TKey key, TValue value); + Task ProduceAsync(string topic, TValue value); + void Dispose(); } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerBaseService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerBaseService.cs deleted file mode 100644 index cbe56a2..0000000 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerBaseService.cs +++ /dev/null @@ -1,64 +0,0 @@ -using Confluent.Kafka; -using Microsoft.Extensions.Configuration; -using Volo.Abp.DependencyInjection; - -namespace JiShe.CollectBus.Kafka.Producer -{ - public class ProducerBaseService - { - /// - /// Initializes a new instance of the class. - /// - /// The configuration. - public ProducerBaseService(IConfiguration configuration) - { - GetInstance(configuration); - } - - /// - /// Gets or sets the instance. - /// - /// - /// The instance. - /// - public IProducer Instance { get; set; } = default; - - /// - /// Gets the instance. - /// - /// The configuration. - /// - public IProducer GetInstance(IConfiguration configuration) - { - var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]); - var producerConfig = new ProducerConfig - { - BootstrapServers = configuration["Kafka:BootstrapServers"], - AllowAutoCreateTopics = true, - }; - - if (enableAuthorization) - { - producerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; - producerConfig.SaslMechanism = SaslMechanism.Plain; - producerConfig.SaslUsername = configuration["Kafka:SaslUserName"]; - producerConfig.SaslPassword = configuration["Kafka:SaslPassword"]; - } - Instance = new ProducerBuilder(producerConfig).Build(); - return Instance; - } - - /// - /// Produces the asynchronous. - /// - /// The topic. - /// The message. - /// The cancellation token. - public async Task ProduceAsync(string topic, Message message, CancellationToken cancellationToken = default) - { - await Instance.ProduceAsync(topic, message, cancellationToken); - } - - - } -} diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index 95c9a15..a927716 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -4,28 +4,70 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Confluent.Kafka; +using JiShe.CollectBus.Kafka.Consumer; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.Kafka.Producer { - public class ProducerService: IProducerService,ITransientDependency + public class ProducerService : IProducerService, IDisposable,ITransientDependency { - private readonly IProducer _producer; + private readonly ILogger> _logger; - public ProducerService(IProducer producer) + protected ProducerService(IConfiguration configuration, ILogger> logger) { - _producer = producer; + _logger = logger; + GetInstance(configuration); } - public async Task ProduceAsync(string topic, string message) + + public IProducer Instance { get; set; } = default; + + public IProducer GetInstance(IConfiguration configuration) { - await _producer.ProduceAsync(topic, new Message + var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]); + var consumerConfig = new ProducerConfig { - Key = null, - Value = message - }); + BootstrapServers = configuration["Kafka:BootstrapServers"], + AllowAutoCreateTopics = true + }; + + if (enableAuthorization) + { + consumerConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; + consumerConfig.SaslMechanism = SaslMechanism.Plain; + consumerConfig.SaslUsername = configuration["Kafka:SaslUserName"]; + consumerConfig.SaslPassword = configuration["Kafka:SaslPassword"]; + } + Instance = new ProducerBuilder(consumerConfig).Build(); + return Instance; + } + public async Task ProduceAsync(string topic, TKey key, TValue value) + { + var message = new Message + { + Key = key, + Value = value + }; + + await Instance.ProduceAsync(topic, message); + } + + public async Task ProduceAsync(string topic, TValue value) + { + var message = new Message + { + Value = value + }; + + await Instance.ProduceAsync(topic, message); + } + + public void Dispose() + { + Instance?.Dispose(); } } }