using Cassandra.Mapping; using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.DataChannels; using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IoTDB; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Mappers; using JiShe.CollectBus.Protocol; using JiShe.CollectBus.ScheduledMeterReading; using Microsoft.Extensions.DependencyInjection; using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading.Channels; using System.Threading.Tasks; using Volo.Abp; using Volo.Abp.Application; using Volo.Abp.Autofac; using Volo.Abp.AutoMapper; using Volo.Abp.BackgroundWorkers; using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.Modularity; namespace JiShe.CollectBus; [DependsOn( typeof(CollectBusDomainModule), typeof(CollectBusApplicationContractsModule), typeof(AbpDddApplicationModule), typeof(AbpAutoMapperModule), typeof(AbpAutofacModule), typeof(AbpBackgroundWorkersHangfireModule), typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule), typeof(CollectBusKafkaModule), typeof(CollectBusIoTDbModule), typeof(CollectBusCassandraModule), typeof(CollectBusProtocolModule) )] public class CollectBusApplicationModule : AbpModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); context.Services.AddAutoMapperObjectMapper(); Configure(options => { options.AddMaps(true); }); context.Services.AddSingleton(new MappingConfiguration() .Define(new CollectBusMapping())); // 注册拦截器 context.Services.OnRegistered(ctx => { var methods = ctx.ImplementationType.GetMethods(); var any = methods.Any(a => a.GetCustomAttribute() != null); if (any) ctx.Interceptors.TryAdd(); }); } public override async Task OnApplicationInitializationAsync( ApplicationInitializationContext context) { var assembly = Assembly.GetExecutingAssembly(); var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface) .ToList(); foreach (var type in types) await context.AddBackgroundWorkerAsync(type); //Task.Run(() => //{ // //默认初始化表计信息 // var dbContext = context.ServiceProvider.GetRequiredService(); // dbContext.InitAmmeterCacheData(); // //await dbContext.InitWatermeterCacheData(); //}).ConfigureAwait(false); //下发任务通道构建 DataChannelManage.TaskDataChannel = Channel.CreateUnbounded>>(); // 日志存储通道构建 DataChannelManage.LogSaveChannel = Channel.CreateUnbounded(); // 日志刷新通道构建 DataChannelManage.LogRefreshChannel = Channel.CreateUnbounded(); // 启动通道任务 var _dataChannelManage = context.ServiceProvider.GetRequiredService(); _ = _dataChannelManage.LogSaveAsync(DataChannelManage.LogSaveChannel.Reader); //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); await dbContext.InitAmmeterCacheData("V4-Gather-8890"); await dbContext.InitWatermeterCacheData("V4-Gather-8890"); } }