71 lines
2.6 KiB
C#

using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System.Reflection;
using Volo.Abp;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Modularity;
using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka
{
public class CollectBusKafkaModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
//var kafkaSection = configuration.GetSection(CommonConst.Kafka);
//KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig();
//kafkaSection.Bind(kafkaOptionConfig);
//Configure<KafkaOptionConfig>(kafkaSection);
Configure<KafkaOptionConfig>(options =>
{
configuration.GetSection(CommonConst.Kafka).Bind(options);
});
// 注册Producer
context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer
context.Services.AddSingleton<IConsumerService, ConsumerService>();
// 注册Polly
context.Services.AddSingleton<KafkaPollyPipeline>();
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
// 注册任务调度
context.Services.AddSingleton<KafkaTaskScheduler>();
//context.Services.AddHostedService<HostedService>();
}
/// <summary>
/// 在初始化之前,初始化Kafka Topic
/// </summary>
/// <param name="context"></param>
public override void OnPreApplicationInitialization(ApplicationInitializationContext context)
{
var app = context.GetApplicationBuilder();
app.ApplicationServices.UseInitKafkaTopic();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var app = context.GetApplicationBuilder();
// 注册Subscriber
app.ApplicationServices.UseKafkaSubscribe();
// 获取程序集
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
}
}
}