This commit is contained in:
cli 2025-04-12 15:11:32 +08:00
commit a57910f4b2
5 changed files with 32 additions and 5 deletions

View File

@ -29,6 +29,8 @@ using Volo.Abp.EventBus.Kafka;
using Volo.Abp.Kafka; using Volo.Abp.Kafka;
using Volo.Abp.EventBus; using Volo.Abp.EventBus;
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Common.Consts;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;
@ -98,12 +100,23 @@ public class CollectBusApplicationModule : AbpModule
{ {
context.AddBackgroundWorkerAsync(type); context.AddBackgroundWorkerAsync(type);
} }
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
//dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
//初始化主题信息
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
var configuration = context.ServiceProvider.GetRequiredService<IConfiguration>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicIfNotExistAsync(item, configuration.GetValue<short>(CommonConst.KafkaReplicationFactor), configuration.GetValue<int>(CommonConst.NumPartitions));
}
} }
} }

View File

@ -28,6 +28,7 @@
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -20,5 +20,16 @@ namespace JiShe.CollectBus.Common.Consts
/// Kafka /// Kafka
/// </summary> /// </summary>
public const string Kafka = "Kafka"; public const string Kafka = "Kafka";
/// <summary>
/// Kafka副本数量
/// </summary>
public const string KafkaReplicationFactor = "KafkaReplicationFactor";
/// <summary>
/// Kafka主题分区数量
/// </summary>
public const string NumPartitions = "NumPartitions";
} }
} }

View File

@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Host
ConfigureHangfire(context); ConfigureHangfire(context);
ConfigureCap(context, configuration); ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration); //ConfigureMassTransit(context, configuration);
ConfigureKafkaTopic(context, configuration); //ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context); ConfigureAuditLog(context);
ConfigureCustom(context, configuration); ConfigureCustom(context, configuration);
} }

View File

@ -128,5 +128,7 @@
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus" "ServerTagName": "JiSheCollectBus",
"KafkaReplicationFactor": 3,
"NumPartitions": 60
} }