using System.Linq; using Microsoft.Extensions.DependencyInjection; using Volo.Abp.AutoMapper; using Volo.Abp.Modularity; using Volo.Abp.Application; using Volo.Abp.BackgroundWorkers; using System.Threading.Tasks; using Volo.Abp; using System.Reflection; using JiShe.CollectBus.FreeSql; using System; using JiShe.CollectBus.Common.Extensions; using Volo.Abp.AspNetCore.Mvc.AntiForgery; using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.Workers; using Volo.Abp.BackgroundWorkers.Hangfire; using JiShe.CollectBus.MongoDB; using JiShe.CollectBus.ScheduledMeterReading; using AutoMapper.Configuration.Annotations; using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.IoTDBProvider; using Confluent.Kafka.Admin; using Microsoft.Extensions.Options; using JiShe.CollectBus.Protocol.Contracts; using System.Collections.Generic; using Thrift; using Microsoft.Extensions.Configuration; using Volo.Abp.EventBus.Kafka; using Volo.Abp.Kafka; using Volo.Abp.EventBus; using Confluent.Kafka; namespace JiShe.CollectBus; [DependsOn( typeof(CollectBusDomainModule), typeof(CollectBusApplicationContractsModule), typeof(AbpDddApplicationModule), typeof(AbpAutoMapperModule), typeof(AbpBackgroundWorkersHangfireModule), typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule), typeof(AbpEventBusModule), typeof(AbpKafkaModule), typeof(CollectBusIoTDBModule) )] public class CollectBusApplicationModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); context.Services.AddAutoMapperObjectMapper(); Configure(options => { options.AddMaps(validate: true); }); Configure(configuration.GetSection("Kafka")); Configure(configuration.GetSection("Kafka:EventBus")); Configure(options => { options.ConfigureConsumer = config => { config.GroupId = configuration.GetValue("Kafka:Consumer:GroupId"); config.EnableAutoCommit = configuration.GetValue("Kafka:Consumer:EnableAutoCommit"); }; }); Configure(options => { options.ConfigureProducer = config => { config.MessageTimeoutMs = configuration.GetValue("Kafka:Producer:MessageTimeoutMs"); config.Acks = (Acks)configuration.GetValue("Kafka:Producer:Acks"); }; }); Configure(options => { options.ConfigureTopic = specification => { specification.ReplicationFactor = configuration.GetValue("Kafka:Topic:ReplicationFactor"); specification.NumPartitions = configuration.GetValue("Kafka:Topic:NumPartitions"); }; }); } public override void OnApplicationInitialization( ApplicationInitializationContext context) { context.ServiceProvider.GetRequiredService().Initialize(); var assembly = Assembly.GetExecutingAssembly(); var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList(); foreach (var type in types) { context.AddBackgroundWorkerAsync(type); } var dbContext = context.ServiceProvider.GetRequiredService(); //默认初始化表计信息 dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); //dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); } }