diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 87d63d0..453855d 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -29,6 +29,8 @@ using Volo.Abp.EventBus.Kafka; using Volo.Abp.Kafka; using Volo.Abp.EventBus; using Confluent.Kafka; +using JiShe.CollectBus.Kafka.AdminClient; +using JiShe.CollectBus.Common.Consts; namespace JiShe.CollectBus; @@ -98,12 +100,24 @@ public class CollectBusApplicationModule : AbpModule { context.AddBackgroundWorkerAsync(type); } - - var dbContext = context.ServiceProvider.GetRequiredService(); //默认初始化表计信息 + var dbContext = context.ServiceProvider.GetRequiredService(); dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); + + //初始化主题信息 + var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); + var configuration = context.ServiceProvider.GetRequiredService(); + + List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); + + List topicSpecifications = new List(); + foreach (var item in topics) + { + kafkaAdminClient.CreateTopicIfNotExistAsync(item, configuration.GetValue(CommonConst.KafkaReplicationFactor), configuration.GetValue(CommonConst.NumPartitions)); + } } } diff --git a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj index ef887fb..5765ecd 100644 --- a/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj +++ b/src/JiShe.CollectBus.Application/JiShe.CollectBus.Application.csproj @@ -28,6 +28,7 @@ + diff --git a/src/JiShe.CollectBus.Common/Consts/CommonConst.cs b/src/JiShe.CollectBus.Common/Consts/CommonConst.cs index ff38f22..471c897 100644 --- a/src/JiShe.CollectBus.Common/Consts/CommonConst.cs +++ b/src/JiShe.CollectBus.Common/Consts/CommonConst.cs @@ -20,5 +20,16 @@ namespace JiShe.CollectBus.Common.Consts /// Kafka /// public const string Kafka = "Kafka"; + + /// + /// Kafka副本数量 + /// + public const string KafkaReplicationFactor = "KafkaReplicationFactor"; + + /// + /// Kafka主题分区数量 + /// + public const string NumPartitions = "NumPartitions"; + } } diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs index 31f2651..aabc2ba 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Host ConfigureHangfire(context); ConfigureCap(context, configuration); //ConfigureMassTransit(context, configuration); - ConfigureKafkaTopic(context, configuration); + //ConfigureKafkaTopic(context, configuration); ConfigureAuditLog(context); ConfigureCustom(context, configuration); } diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 2c72f7d..4327ff5 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -127,5 +127,7 @@ "OpenDebugMode": true, "UseTableSessionPoolByDefault": false }, - "ServerTagName": "JiSheCollectBus" + "ServerTagName": "JiSheCollectBus", + "KafkaReplicationFactor": 3, + "NumPartitions": 60 } \ No newline at end of file