From e672a6800d20145fa436c985c2b99698540f1ce5 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Tue, 15 Apr 2025 11:15:22 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4kafka=E7=BA=A6=E6=9D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + .../Attributes/KafkaSubscribeAttribute.cs | 51 ++++++++ .../CollectBusKafkaModule.cs | 12 ++ .../Consumer/ConsumerService.cs | 28 +++++ .../Consumer/IConsumerService.cs | 8 ++ .../IKafkaSubscribe.cs | 12 ++ .../JiShe.CollectBus.Kafka.csproj | 3 +- .../KafkaSubcribesExtensions.cs | 119 ++++++++++++++++++ .../Producer/ProducerService.cs | 2 +- 9 files changed, 234 insertions(+), 2 deletions(-) create mode 100644 src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs create mode 100644 src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs create mode 100644 src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs diff --git a/.gitignore b/.gitignore index daafdc8..68f674a 100644 --- a/.gitignore +++ b/.gitignore @@ -401,3 +401,4 @@ FodyWeavers.xsd # ABP Studio **/.abpstudio/ /src/JiShe.CollectBus.Host/Plugins/*.dll +JiShe.CollectBus.Kafka.Test diff --git a/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs new file mode 100644 index 0000000..5f70b77 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/Attributes/KafkaSubscribeAttribute.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Attributes +{ + [AttributeUsage(AttributeTargets.Method)] + public class KafkaSubscribeAttribute : Attribute + { + /// + /// 订阅的主题 + /// + public string[] Topics { get; set; } + + /// + /// 分区 + /// + public int Partition { get; set; } = -1; + + /// + /// 消费者组 + /// + public string GroupId { get; set; } = "default"; + + public KafkaSubscribeAttribute(string[] topics) + { + this.Topics = topics; + } + + public KafkaSubscribeAttribute(string[] topics, string groupId) + { + this.Topics = topics; + this.GroupId = groupId; + } + + public KafkaSubscribeAttribute(string[] topics, int partition) + { + this.Topics = topics; + this.Partition = partition; + } + + public KafkaSubscribeAttribute(string[] topics, int partition, string groupId) + { + this.Topics = topics; + this.Partition = partition; + this.GroupId = groupId; + } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index a73eb79..f0462b0 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -1,9 +1,14 @@ using Confluent.Kafka; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Producer; +using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using System.Reflection; +using Volo.Abp; +using Volo.Abp.DependencyInjection; using Volo.Abp.Modularity; +using static Confluent.Kafka.ConfigPropertyNames; namespace JiShe.CollectBus.Kafka { @@ -16,5 +21,12 @@ namespace JiShe.CollectBus.Kafka // 注册Consumer context.Services.AddTransient(typeof(IConsumerService<,>), typeof(ConsumerService<,>)); } + + public override void OnApplicationInitialization(ApplicationInitializationContext context) + { + var app = context.GetApplicationBuilder(); + app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); + + } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index 34d63fb..a7626dd 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -69,6 +69,34 @@ namespace JiShe.CollectBus.Kafka.Consumer } } + /// + /// 订阅多个topic + /// + /// + /// + /// + public async Task SubscribeAsync(string[] topics, Func messageHandler) + { + _cancellationTokenSource = new CancellationTokenSource(); + Instance.Subscribe(topics); + + try + { + while (!_cancellationTokenSource.Token.IsCancellationRequested) + { + var result = Instance.Consume(_cancellationTokenSource.Token); + if (result != null) + { + await messageHandler(result.Message.Key, result.Message.Value); + } + } + } + catch (OperationCanceledException) + { + Instance.Close(); + } + } + public void Unsubscribe() { _cancellationTokenSource?.Cancel(); diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs index bb88038..990bc86 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs @@ -11,5 +11,13 @@ namespace JiShe.CollectBus.Kafka.Consumer Task SubscribeAsync(string topic, Func messageHandler); void Unsubscribe(); void Dispose(); + + /// + /// 订阅多个topic + /// + /// + /// + /// + Task SubscribeAsync(string[] topics, Func messageHandler); } } diff --git a/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs b/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs new file mode 100644 index 0000000..3ccea65 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka +{ + public interface IKafkaSubscribe + { + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj index 3175346..cef24d5 100644 --- a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj +++ b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj @@ -8,7 +8,8 @@ - + + diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs new file mode 100644 index 0000000..fede540 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -0,0 +1,119 @@ +using Confluent.Kafka; +using JiShe.CollectBus.Kafka.Attributes; +using JiShe.CollectBus.Kafka.Consumer; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka +{ + public static class KafkaSubcribesExtensions + { + /// + /// 添加Kafka订阅 + /// + /// + /// + public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly) + { + var subscribeTypes = assembly.GetTypes() + .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) + .ToList(); + + if (subscribeTypes.Count == 0) return; + + var provider = app.ApplicationServices; + var lifetime = provider.GetRequiredService(); + + lifetime.ApplicationStarted.Register(() => + { + foreach (var subscribeType in subscribeTypes) + { + var subscribes = provider.GetServices(subscribeType).ToList(); + subscribes.ForEach(subscribe => { + + if(subscribe is IKafkaSubscribe) + { + BuildKafkaSubscriber(subscribe, provider); + } + }); + } + }); + } + + /// + /// 构建Kafka订阅 + /// + /// + /// + private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider) + { + var methods = subscribe.GetType().GetMethods(); + foreach (var method in methods) + { + var attr = method.GetCustomAttribute(); + if (attr == null) continue; + + // 启动后台消费线程 + Task.Run(() => StartConsumerAsync(provider, attr, method, subscribe)); + } + } + + /// + /// 启动后台消费线程 + /// + /// + /// + /// + /// + /// + private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe) + { + var consumerService = provider.GetRequiredService>(); + await consumerService.SubscribeAsync(attr.Topics, async (key, message) => + { + try + { + if (string.IsNullOrEmpty(message)) + await Task.CompletedTask ; + + // 处理消息 + await ProcessMessageAsync(message, method, subscribe); + } + catch (ConsumeException ex) + { + // 处理消费错误 + throw; + } + }); + } + + private static async Task ProcessMessageAsync(string message, MethodInfo method, object subscribe) + { + var parameters = method.GetParameters(); + if (parameters.Length != 1) return; + + var paramType = parameters[0].ParameterType; + var messageObj = paramType == typeof(string) + ? message + : JsonConvert.DeserializeObject(message, paramType); + + if (method.ReturnType == typeof(Task)) + { + await (Task)method.Invoke(subscribe, new[] { messageObj })!; + } + else + { + method.Invoke(subscribe, new[] { messageObj }); + } + } + + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index c3e65f6..99d3dd3 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -37,7 +37,7 @@ namespace JiShe.CollectBus.Kafka.Producer CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd BatchSize = 32768, // 修改批次大小为32K LingerMs = 20, // 修改等待时间为20ms - Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功 + Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader MessageSendMaxRetries = 50, // 消息发送失败最大重试50次 };