Compare commits

..

No commits in common. "f2419a70983cb8e3f839bab9fceadf73e33e4720" and "949370c0c6c0484d6ae397f9e49a5fcce1685eba" have entirely different histories.

5 changed files with 5 additions and 32 deletions

View File

@ -29,8 +29,6 @@ using Volo.Abp.EventBus.Kafka;
using Volo.Abp.Kafka; using Volo.Abp.Kafka;
using Volo.Abp.EventBus; using Volo.Abp.EventBus;
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Common.Consts;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;
@ -100,23 +98,12 @@ public class CollectBusApplicationModule : AbpModule
{ {
context.AddBackgroundWorkerAsync(type); context.AddBackgroundWorkerAsync(type);
} }
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
//默认初始化表计信息 //默认初始化表计信息
var dbContext = context.ServiceProvider.GetRequiredService<EnergySystemScheduledMeterReadingService>();
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); //dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
//初始化主题信息
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
var configuration = context.ServiceProvider.GetRequiredService<IConfiguration>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicIfNotExistAsync(item, configuration.GetValue<short>(CommonConst.KafkaReplicationFactor), configuration.GetValue<int>(CommonConst.NumPartitions));
}
} }
} }

View File

@ -28,7 +28,6 @@
<ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Application.Contracts\JiShe.CollectBus.Application.Contracts.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.MongoDB\JiShe.CollectBus.MongoDB.csproj" />
<ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Protocol.Contracts\JiShe.CollectBus.Protocol.Contracts.csproj" />
</ItemGroup> </ItemGroup>

View File

@ -20,16 +20,5 @@ namespace JiShe.CollectBus.Common.Consts
/// Kafka /// Kafka
/// </summary> /// </summary>
public const string Kafka = "Kafka"; public const string Kafka = "Kafka";
/// <summary>
/// Kafka副本数量
/// </summary>
public const string KafkaReplicationFactor = "KafkaReplicationFactor";
/// <summary>
/// Kafka主题分区数量
/// </summary>
public const string NumPartitions = "NumPartitions";
} }
} }

View File

@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Host
ConfigureHangfire(context); ConfigureHangfire(context);
ConfigureCap(context, configuration); ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration); //ConfigureMassTransit(context, configuration);
//ConfigureKafkaTopic(context, configuration); ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context); ConfigureAuditLog(context);
ConfigureCustom(context, configuration); ConfigureCustom(context, configuration);
} }

View File

@ -127,7 +127,5 @@
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus", "ServerTagName": "JiSheCollectBus"
"KafkaReplicationFactor": 3,
"NumPartitions": 60
} }