From 96e066376f9b08f00e35d119ac1df61f7894a0c6 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Thu, 17 Apr 2025 13:01:26 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0kafka=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=8F=96=E7=A8=8B=E5=BA=8F=E6=89=80=E6=9C=89?= =?UTF-8?q?=E7=9A=84=E8=AE=A2=E9=98=85=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusKafkaModule.cs | 37 +------- .../IKafkaSubscribe.cs | 6 ++ .../KafkaSubcribesExtensions.cs | 92 +++++++++++-------- 3 files changed, 64 insertions(+), 71 deletions(-) diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index 80739a6..0444412 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -34,39 +34,12 @@ namespace JiShe.CollectBus.Kafka public override void OnApplicationInitialization(ApplicationInitializationContext context) { var app = context.GetApplicationBuilder(); - // 程序运行目录 - //var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); - //if (!string.IsNullOrWhiteSpace(assemblyPath)) - //{ - // var dllFiles = Directory.GetFiles(assemblyPath, "*.dll"); - // var kafkaSubscriberAssemblies = new List(); - // foreach (var file in dllFiles) - // { - // try - // { - // // 跳过已加载的程序集 - // var assemblyName = AssemblyName.GetAssemblyName(file); - // var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() - // .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); - // var assembly = existingAssembly ?? Assembly.LoadFrom(file); - - // // 检查程序集是否包含 IKafkaSubscribe 的实现类 - // var hasSubscriber = assembly.GetTypes() - // .Any(type => - // typeof(IKafkaSubscribe).IsAssignableFrom(type) && // 实现接口 - // !type.IsAbstract && !type.IsInterface); // 排除抽象类和接口本身 - - // if (hasSubscriber) - // { - // kafkaSubscriberAssemblies.Add(assembly); - // } - // } - // catch{} - // app.UseKafkaSubscribers(kafkaSubscriberAssemblies.ToArray()); - // } - //} + + // 注册Subscriber + app.ApplicationServices.UseKafkaSubscribers(); + // 获取程序集 - app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); + //app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs b/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs index 3ccea65..39e5789 100644 --- a/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs +++ b/src/JiShe.CollectBus.KafkaProducer/IKafkaSubscribe.cs @@ -6,6 +6,12 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Kafka { + /// + /// Kafka订阅者 + /// + /// 订阅者需要继承此接口并需要依赖注入,并使用标记 + /// + /// public interface IKafkaSubscribe { } diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 4c69b6a..8f2974a 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -1,8 +1,10 @@ using Confluent.Kafka; +using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Mvc.ApplicationParts; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -12,45 +14,57 @@ namespace JiShe.CollectBus.Kafka { public static class KafkaSubcribesExtensions { - ///// - ///// 添加Kafka订阅 - ///// - ///// - ///// - //public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly[] assemblys) - //{ - // var provider = app.ApplicationServices; - // var lifetime = provider.GetRequiredService(); + /// + /// 添加Kafka订阅 + /// + /// + /// + public static void UseKafkaSubscribers(this IServiceProvider provider) + { + var lifetime = provider.GetRequiredService(); - // lifetime.ApplicationStarted.Register(() => - // { - // var logger = provider.GetRequiredService>(); - // int threadCount = 0; - // int topicCount = 0; - // foreach (Assembly assembly in assemblys) - // { - // var subscribeTypes = assembly.GetTypes() - // .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) - // .ToList(); - - // if (subscribeTypes.Count == 0) return; - // foreach (var subscribeType in subscribeTypes) - // { - // var subscribes = provider.GetServices(subscribeType).ToList(); - // subscribes.ForEach(subscribe => { - - // if (subscribe is IKafkaSubscribe) - // { - // Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger); - // threadCount += tuple.Item1; - // topicCount += tuple.Item2; - // } - // }); - // } - // } - // logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); - // }); - //} + lifetime.ApplicationStarted.Register(() => + { + var logger = provider.GetRequiredService>(); + int threadCount = 0; + int topicCount = 0; + var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); + if (string.IsNullOrWhiteSpace(assemblyPath)) + { + logger.LogInformation($"kafka订阅未能找到程序路径"); + return; + } + var dllFiles = Directory.GetFiles(assemblyPath, "*.dll"); + foreach (var file in dllFiles) + { + // 跳过已加载的程序集 + var assemblyName = AssemblyName.GetAssemblyName(file); + var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() + .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); + var assembly = existingAssembly ?? Assembly.LoadFrom(file); + // 实现IKafkaSubscribe接口 + var subscribeTypes = assembly.GetTypes().Where(type => + typeof(IKafkaSubscribe).IsAssignableFrom(type) && + !type.IsAbstract && !type.IsInterface).ToList(); ; + if (subscribeTypes.Count == 0) + continue; + foreach (var subscribeType in subscribeTypes) + { + var subscribes = provider.GetServices(subscribeType).ToList(); + subscribes.ForEach(subscribe => + { + if (subscribe!=null) + { + Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger); + threadCount += tuple.Item1; + topicCount += tuple.Item2; + } + }); + } + } + logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); + }); + } public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly) { @@ -72,7 +86,7 @@ namespace JiShe.CollectBus.Kafka var subscribes = provider.GetServices(subscribeType).ToList(); subscribes.ForEach(subscribe => { - if (subscribe is IKafkaSubscribe) + if (subscribe != null) { Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger); threadCount += tuple.Item1; From 4c5f7231bf7dcd267ab30ac8c4c214c40630a653 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Thu, 17 Apr 2025 13:41:53 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=9C=AA=E5=A4=9A?= =?UTF-8?q?=E4=BD=99=E7=9A=84=E9=85=8D=E7=BD=AE=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../KafkaOptions.cs | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs deleted file mode 100644 index d946cc8..0000000 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaOptions.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Kafka -{ - public class KafkaOptions - { - public string BootstrapServers { get; set; } - public string GroupId { get; set; } - public Dictionary ProducerConfig { get; set; } = new(); - public Dictionary ConsumerConfig { get; set; } = new(); - public Dictionary AdminConfig { get; set; } = new(); - } -}