using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.ScheduledMeterReading; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading.Tasks; using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.IoTDB; using Volo.Abp; using Volo.Abp.Application; using Volo.Abp.Autofac; using Volo.Abp.AutoMapper; using Volo.Abp.BackgroundWorkers; using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.EventBus; using Volo.Abp.Modularity; using Microsoft.Extensions.Options; using JiShe.CollectBus.Kafka.Internal; namespace JiShe.CollectBus; [DependsOn( typeof(CollectBusDomainModule), typeof(CollectBusApplicationContractsModule), typeof(AbpDddApplicationModule), typeof(AbpAutoMapperModule), typeof(AbpAutofacModule), typeof(AbpBackgroundWorkersHangfireModule), typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule), typeof(CollectBusKafkaModule), typeof(CollectBusIoTDBModule), typeof(CollectBusCassandraModule) )] public class CollectBusApplicationModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); context.Services.AddAutoMapperObjectMapper(); Configure(options => { options.AddMaps(validate: true); }); } public override async Task OnApplicationInitializationAsync( ApplicationInitializationContext context) { var assembly = Assembly.GetExecutingAssembly(); var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList(); foreach (var type in types) { await context.AddBackgroundWorkerAsync(type); } //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); await dbContext.InitAmmeterCacheData(); //await dbContext.InitWatermeterCacheData(); //初始化主题信息 var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); var configuration = context.ServiceProvider.GetRequiredService(); var kafkaOptions = context.ServiceProvider.GetRequiredService>(); List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); foreach (var item in topics) { await kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor); } } }