From fa18377f107288f31731531a19fbd6d62ccda845 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Thu, 17 Apr 2025 11:53:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/JiShe.CollectBus.Host/appsettings.json | 4 +- .../CollectBusKafkaModule.cs | 58 +++++++------- .../KafkaSubcribesExtensions.cs | 76 +++++++++++++------ .../Abstracts/BaseProtocolPlugin.cs | 1 + 4 files changed, 86 insertions(+), 53 deletions(-) diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index f830481..7aa3947 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -86,8 +86,8 @@ "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "EnableFilter": true, "EnableAuthorization": false, - "SecurityProtocol": "SASL_PLAINTEXT", - "SaslMechanism": "PLAIN", + "SecurityProtocol": "SaslPlaintext", + "SaslMechanism": "Plain", "SaslUserName": "lixiao", "SaslPassword": "lixiao1980" //"Topic": { diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index 48a8c35..80739a6 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -35,38 +35,38 @@ namespace JiShe.CollectBus.Kafka { var app = context.GetApplicationBuilder(); // 程序运行目录 - var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); - if (!string.IsNullOrWhiteSpace(assemblyPath)) - { - var dllFiles = Directory.GetFiles(assemblyPath, "*.dll"); - var kafkaSubscriberAssemblies = new List(); - foreach (var file in dllFiles) - { - try - { - // 跳过已加载的程序集 - var assemblyName = AssemblyName.GetAssemblyName(file); - var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() - .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); - var assembly = existingAssembly ?? Assembly.LoadFrom(file); + //var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); + //if (!string.IsNullOrWhiteSpace(assemblyPath)) + //{ + // var dllFiles = Directory.GetFiles(assemblyPath, "*.dll"); + // var kafkaSubscriberAssemblies = new List(); + // foreach (var file in dllFiles) + // { + // try + // { + // // 跳过已加载的程序集 + // var assemblyName = AssemblyName.GetAssemblyName(file); + // var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() + // .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); + // var assembly = existingAssembly ?? Assembly.LoadFrom(file); - // 检查程序集是否包含 IKafkaSubscribe 的实现类 - var hasSubscriber = assembly.GetTypes() - .Any(type => - typeof(IKafkaSubscribe).IsAssignableFrom(type) && // 实现接口 - !type.IsAbstract && !type.IsInterface); // 排除抽象类和接口本身 + // // 检查程序集是否包含 IKafkaSubscribe 的实现类 + // var hasSubscriber = assembly.GetTypes() + // .Any(type => + // typeof(IKafkaSubscribe).IsAssignableFrom(type) && // 实现接口 + // !type.IsAbstract && !type.IsInterface); // 排除抽象类和接口本身 - if (hasSubscriber) - { - kafkaSubscriberAssemblies.Add(assembly); - } - } - catch{} - app.UseKafkaSubscribers(kafkaSubscriberAssemblies.ToArray()); - } - } + // if (hasSubscriber) + // { + // kafkaSubscriberAssemblies.Add(assembly); + // } + // } + // catch{} + // app.UseKafkaSubscribers(kafkaSubscriberAssemblies.ToArray()); + // } + //} // 获取程序集 - //app.UseKafkaSubscribers(new[] { Assembly.Load("JiShe.CollectBus.Application")}); + app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 1f83540..4c69b6a 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -12,12 +12,47 @@ namespace JiShe.CollectBus.Kafka { public static class KafkaSubcribesExtensions { - /// - /// 添加Kafka订阅 - /// - /// - /// - public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly[] assemblys) + ///// + ///// 添加Kafka订阅 + ///// + ///// + ///// + //public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly[] assemblys) + //{ + // var provider = app.ApplicationServices; + // var lifetime = provider.GetRequiredService(); + + // lifetime.ApplicationStarted.Register(() => + // { + // var logger = provider.GetRequiredService>(); + // int threadCount = 0; + // int topicCount = 0; + // foreach (Assembly assembly in assemblys) + // { + // var subscribeTypes = assembly.GetTypes() + // .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) + // .ToList(); + + // if (subscribeTypes.Count == 0) return; + // foreach (var subscribeType in subscribeTypes) + // { + // var subscribes = provider.GetServices(subscribeType).ToList(); + // subscribes.ForEach(subscribe => { + + // if (subscribe is IKafkaSubscribe) + // { + // Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger); + // threadCount += tuple.Item1; + // topicCount += tuple.Item2; + // } + // }); + // } + // } + // logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); + // }); + //} + + public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly) { var provider = app.ApplicationServices; var lifetime = provider.GetRequiredService(); @@ -27,26 +62,23 @@ namespace JiShe.CollectBus.Kafka var logger = provider.GetRequiredService>(); int threadCount = 0; int topicCount = 0; - foreach (Assembly assembly in assemblys) - { - var subscribeTypes = assembly.GetTypes() + var subscribeTypes = assembly.GetTypes() .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) .ToList(); - if (subscribeTypes.Count == 0) return; - foreach (var subscribeType in subscribeTypes) - { - var subscribes = provider.GetServices(subscribeType).ToList(); - subscribes.ForEach(subscribe => { + if (subscribeTypes.Count == 0) return; + foreach (var subscribeType in subscribeTypes) + { + var subscribes = provider.GetServices(subscribeType).ToList(); + subscribes.ForEach(subscribe => { - if (subscribe is IKafkaSubscribe) - { - Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger); - threadCount += tuple.Item1; - topicCount += tuple.Item2; - } - }); - } + if (subscribe is IKafkaSubscribe) + { + Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger); + threadCount += tuple.Item1; + topicCount += tuple.Item2; + } + }); } logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); }); diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 159f0fc..52ef129 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -12,6 +12,7 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using MassTransit; using DotNetCore.CAP; +using JiShe.CollectBus.Kafka.Producer; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts {