JiShe.CollectBus/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs

123 lines
4.5 KiB
C#
Raw Normal View History

2024-12-19 16:07:07 +08:00
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.AutoMapper;
using Volo.Abp.Modularity;
using Volo.Abp.Application;
using Volo.Abp.BackgroundWorkers;
using System.Threading.Tasks;
using Volo.Abp;
using System.Reflection;
using JiShe.CollectBus.FreeSql;
using System;
2025-03-21 14:30:11 +08:00
using JiShe.CollectBus.Common.Extensions;
2024-12-19 16:07:07 +08:00
using Volo.Abp.AspNetCore.Mvc.AntiForgery;
using JiShe.CollectBus.FreeRedisProvider;
2025-03-14 13:45:29 +08:00
using JiShe.CollectBus.Workers;
using Volo.Abp.BackgroundWorkers.Hangfire;
2025-03-20 16:40:27 +08:00
using JiShe.CollectBus.MongoDB;
2025-03-21 11:48:31 +08:00
using JiShe.CollectBus.ScheduledMeterReading;
2025-03-21 14:30:11 +08:00
using AutoMapper.Configuration.Annotations;
using JiShe.CollectBus.Common.Attributes;
2025-04-02 17:46:33 +08:00
using JiShe.CollectBus.IoTDBProvider;
2025-04-07 17:35:37 +08:00
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Options;
using JiShe.CollectBus.Protocol.Contracts;
using System.Collections.Generic;
using Thrift;
using Microsoft.Extensions.Configuration;
2025-04-09 14:33:20 +08:00
using Volo.Abp.EventBus.Kafka;
using Volo.Abp.Kafka;
using Volo.Abp.EventBus;
using Confluent.Kafka;
2025-04-11 17:43:19 +08:00
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Common.Consts;
2024-12-19 16:07:07 +08:00
namespace JiShe.CollectBus;
[DependsOn(
typeof(CollectBusDomainModule),
typeof(CollectBusApplicationContractsModule),
typeof(AbpDddApplicationModule),
typeof(AbpAutoMapperModule),
2025-03-14 13:45:29 +08:00
typeof(AbpBackgroundWorkersHangfireModule),
2025-03-17 08:35:19 +08:00
typeof(CollectBusFreeRedisModule),
2025-04-09 14:33:20 +08:00
typeof(CollectBusFreeSqlModule),
typeof(AbpEventBusModule),
2025-04-10 17:06:20 +08:00
typeof(AbpKafkaModule),
typeof(CollectBusIoTDBModule)
2024-12-19 16:07:07 +08:00
)]
public class CollectBusApplicationModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
2025-04-09 14:33:20 +08:00
var configuration = context.Services.GetConfiguration();
2024-12-19 16:07:07 +08:00
context.Services.AddAutoMapperObjectMapper<CollectBusApplicationModule>();
Configure<AbpAutoMapperOptions>(options =>
{
options.AddMaps<CollectBusApplicationModule>(validate: true);
});
2025-04-09 14:33:20 +08:00
Configure<AbpKafkaOptions>(configuration.GetSection("Kafka"));
Configure<AbpKafkaEventBusOptions>(configuration.GetSection("Kafka:EventBus"));
Configure<AbpKafkaOptions>(options =>
{
options.ConfigureConsumer = config =>
{
config.GroupId = configuration.GetValue<string>("Kafka:Consumer:GroupId");
config.EnableAutoCommit = configuration.GetValue<bool>("Kafka:Consumer:EnableAutoCommit");
};
});
Configure<AbpKafkaOptions>(options =>
{
options.ConfigureProducer = config =>
{
config.MessageTimeoutMs = configuration.GetValue<int>("Kafka:Producer:MessageTimeoutMs");
config.Acks = (Acks)configuration.GetValue<int>("Kafka:Producer:Acks");
};
});
Configure<AbpKafkaOptions>(options =>
{
options.ConfigureTopic = specification =>
{
specification.ReplicationFactor = configuration.GetValue<short>("Kafka:Topic:ReplicationFactor");
specification.NumPartitions = configuration.GetValue<int>("Kafka:Topic:NumPartitions");
};
});
2024-12-19 16:07:07 +08:00
}
2025-03-14 13:45:29 +08:00
public override void OnApplicationInitialization(
2024-12-19 16:07:07 +08:00
ApplicationInitializationContext context)
{
2025-04-09 14:33:20 +08:00
context.ServiceProvider.GetRequiredService<CustomKafkaDistributedEventBus>().Initialize();
2024-12-19 16:07:07 +08:00
var assembly = Assembly.GetExecutingAssembly();
var types = assembly.GetTypes().Where(t => typeof(ICollectWorker).IsAssignableFrom(t) && !t.IsInterface).ToList();
foreach (var type in types)
{
2025-03-14 13:45:29 +08:00
context.AddBackgroundWorkerAsync(type);
2024-12-19 16:07:07 +08:00
}
2025-03-21 11:48:31 +08:00
//默认初始化表计信息
2025-04-11 17:43:19 +08:00
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
2025-04-07 17:35:37 +08:00
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
2025-04-10 17:06:53 +08:00
dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
2025-04-11 17:43:19 +08:00
//初始化主题信息
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
var configuration = context.ServiceProvider.GetRequiredService<IConfiguration>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
2025-04-14 09:29:12 +08:00
kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue<int>(CommonConst.NumPartitions), configuration.GetValue<short>(CommonConst.KafkaReplicationFactor));
2025-04-11 17:43:19 +08:00
}
2024-12-19 16:07:07 +08:00
}
}