diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs index a0438db..96237bb 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs @@ -137,9 +137,13 @@ namespace JiShe.CollectBus.Kafka foreach (var sub in subscribedMethods) { - int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; + //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; var adminClientService = provider.GetRequiredService(); + int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); + + int partitionCount = sub.Attribute!.TaskCount == -1 ? topicCount : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions; + partitionCount = partitionCount > topicCount ? topicCount : partitionCount; //partitionCount = sub.Attribute!.TaskCount == -1 ? adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0)