using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using System.Reflection; using Volo.Abp; using Volo.Abp.DependencyInjection; using Volo.Abp.Modularity; using static Confluent.Kafka.ConfigPropertyNames; namespace JiShe.CollectBus.Kafka { public class CollectBusKafkaModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); //var kafkaSection = configuration.GetSection(CommonConst.Kafka); //KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig (); //kafkaSection.Bind(kafkaOptionConfig); //if (configuration[CommonConst.ServerTagName] != null) //{ // kafkaOptionConfig.ServerTagName = configuration[CommonConst.ServerTagName]!; //} //context.Services.AddSingleton(kafkaOptionConfig); //context.Services.Configure(context.Services.GetConfiguration().GetSection(CommonConst.Kafka)); Configure(options => { configuration.GetSection(CommonConst.Kafka).Bind(options); }); // 注册Producer context.Services.AddSingleton(); // 注册Consumer context.Services.AddSingleton(); // 注册Polly context.Services.AddSingleton(); //context.Services.AddHostedService(); } /// /// 在初始化之前,初始化Kafka Topic /// /// public override void OnPreApplicationInitialization(ApplicationInitializationContext context) { var app = context.GetApplicationBuilder(); app.ApplicationServices.UseInitKafkaTopic(); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { var app = context.GetApplicationBuilder(); // 注册Subscriber app.ApplicationServices.UseKafkaSubscribe(); // 获取程序集 //app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); } } }