From 3d56d351d3298238b6f675ac25c5c301bd072bda Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 14 Apr 2025 09:29:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusApplicationModule.cs | 2 +- .../BasicScheduledMeterReadingService.cs | 2 ++ .../Workers/CreateToBeIssueTaskWorker.cs | 4 ++-- .../AdminClient/AdminClientService.cs | 4 ++++ .../Producer/ProducerService.cs | 4 ++-- 5 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index e1ac44e..8544639 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -115,7 +115,7 @@ public class CollectBusApplicationModule : AbpModule foreach (var item in topics) { - kafkaAdminClient.CreateTopicIfNotExistAsync(item, configuration.GetValue(CommonConst.KafkaReplicationFactor), configuration.GetValue(CommonConst.NumPartitions)); + kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue(CommonConst.NumPartitions), configuration.GetValue(CommonConst.KafkaReplicationFactor)); } } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 2e58879..6803171 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -95,6 +95,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } + //检查任务时间节点 + //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率 var tempArryay = item.Split(":"); string meteryType = tempArryay[3];//表计类别 diff --git a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs index b936bbb..05fd90d 100644 --- a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs @@ -27,14 +27,14 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(CreateToBeIssueTaskWorker); - CronExpression = $"*/{1} * * * *"; ; + CronExpression = $"{10}/* * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) { - //await _scheduledMeterReadingService.CreateToBeIssueTasks(); + await _scheduledMeterReadingService.CreateToBeIssueTasks(); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs index e687276..217e35c 100644 --- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs @@ -71,8 +71,12 @@ namespace JiShe.CollectBus.Kafka.AdminClient public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) { + try { + if (await CheckTopicAsync(topic)) return; + + await Instance.CreateTopicsAsync(new[] { new TopicSpecification diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index a927716..0cfed2e 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -14,9 +14,9 @@ namespace JiShe.CollectBus.Kafka.Producer public class ProducerService : IProducerService, IDisposable,ITransientDependency { - private readonly ILogger> _logger; + private readonly ILogger> _logger; - protected ProducerService(IConfiguration configuration, ILogger> logger) + protected ProducerService(IConfiguration configuration, ILogger> logger) { _logger = logger; GetInstance(configuration);