优化Kafka初始化主题。
This commit is contained in:
parent
99f96d889e
commit
ad89d2716c
@ -29,6 +29,8 @@ using Volo.Abp.EventBus.Kafka;
|
||||
using Volo.Abp.Kafka;
|
||||
using Volo.Abp.EventBus;
|
||||
using Confluent.Kafka;
|
||||
using JiShe.CollectBus.Kafka.AdminClient;
|
||||
using JiShe.CollectBus.Common.Consts;
|
||||
|
||||
namespace JiShe.CollectBus;
|
||||
|
||||
@ -99,11 +101,23 @@ public class CollectBusApplicationModule : AbpModule
|
||||
context.AddBackgroundWorkerAsync(type);
|
||||
}
|
||||
|
||||
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
||||
|
||||
//默认初始化表计信息
|
||||
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
|
||||
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||
dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||
|
||||
//初始化主题信息
|
||||
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
|
||||
var configuration = context.ServiceProvider.GetRequiredService<IConfiguration>();
|
||||
|
||||
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
|
||||
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
|
||||
|
||||
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
|
||||
foreach (var item in topics)
|
||||
{
|
||||
kafkaAdminClient.CreateTopicIfNotExistAsync(item, configuration.GetValue<short>(CommonConst.KafkaReplicationFactor), configuration.GetValue<int>(CommonConst.NumPartitions));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@
|
||||
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
@ -20,5 +20,16 @@ namespace JiShe.CollectBus.Common.Consts
|
||||
/// Kafka
|
||||
/// </summary>
|
||||
public const string Kafka = "Kafka";
|
||||
|
||||
/// <summary>
|
||||
/// Kafka副本数量
|
||||
/// </summary>
|
||||
public const string KafkaReplicationFactor = "KafkaReplicationFactor";
|
||||
|
||||
/// <summary>
|
||||
/// Kafka主题分区数量
|
||||
/// </summary>
|
||||
public const string NumPartitions = "NumPartitions";
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Host
|
||||
ConfigureHangfire(context);
|
||||
ConfigureCap(context, configuration);
|
||||
//ConfigureMassTransit(context, configuration);
|
||||
ConfigureKafkaTopic(context, configuration);
|
||||
//ConfigureKafkaTopic(context, configuration);
|
||||
ConfigureAuditLog(context);
|
||||
ConfigureCustom(context, configuration);
|
||||
}
|
||||
|
||||
@ -127,5 +127,7 @@
|
||||
"OpenDebugMode": true,
|
||||
"UseTableSessionPoolByDefault": false
|
||||
},
|
||||
"ServerTagName": "JiSheCollectBus"
|
||||
"ServerTagName": "JiSheCollectBus",
|
||||
"KafkaReplicationFactor": 3,
|
||||
"NumPartitions": 60
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user