using Confluent.Kafka; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Producer; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using System.Reflection; using Volo.Abp; using Volo.Abp.DependencyInjection; using Volo.Abp.Modularity; using static Confluent.Kafka.ConfigPropertyNames; namespace JiShe.CollectBus.Kafka { public class CollectBusKafkaModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); var kafkaSection = configuration.GetSection("Kafka"); KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig (); kafkaSection.Bind(kafkaOptionConfig); if (configuration["ServerTagName"] != null) { kafkaOptionConfig.ServerTagName = configuration["ServerTagName"]!; } context.Services.AddSingleton(kafkaOptionConfig); // 注册Producer context.Services.AddSingleton(); // 注册Consumer context.Services.AddSingleton(); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { var app = context.GetApplicationBuilder(); // 程序运行目录 var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); if (!string.IsNullOrWhiteSpace(assemblyPath)) { var dllFiles = Directory.GetFiles(assemblyPath, "*.dll"); var kafkaSubscriberAssemblies = new List(); foreach (var file in dllFiles) { try { // 跳过已加载的程序集 var assemblyName = AssemblyName.GetAssemblyName(file); var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); var assembly = existingAssembly ?? Assembly.LoadFrom(file); // 检查程序集是否包含 IKafkaSubscribe 的实现类 var hasSubscriber = assembly.GetTypes() .Any(type => typeof(IKafkaSubscribe).IsAssignableFrom(type) && // 实现接口 !type.IsAbstract && !type.IsInterface); // 排除抽象类和接口本身 if (hasSubscriber) { kafkaSubscriberAssemblies.Add(assembly); } } catch{} app.UseKafkaSubscribers(kafkaSubscriberAssemblies.ToArray()); } } // 获取程序集 //app.UseKafkaSubscribers(new[] { Assembly.Load("JiShe.CollectBus.Application")}); } } }