From 6c8ffb3ae5429f18d925dda5b04fea80610fc7d6 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Thu, 17 Apr 2025 13:35:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/JiShe.CollectBus.Host/appsettings.json | 8 +++---- .../CollectBusIoTDBModule.cs | 9 ++++++- .../CollectBusKafkaModule.cs | 24 +++++++++++++------ .../JiShe.CollectBus.Kafka.csproj | 4 ++++ .../KafkaOptionConfig.cs | 10 ++++++++ .../KafkaSubcribesExtensions.cs | 9 ++++--- 6 files changed, 49 insertions(+), 15 deletions(-) diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 7aa3947..735ce0d 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -89,7 +89,9 @@ "SecurityProtocol": "SaslPlaintext", "SaslMechanism": "Plain", "SaslUserName": "lixiao", - "SaslPassword": "lixiao1980" + "SaslPassword": "lixiao1980", + "KafkaReplicationFactor": 3, + "NumPartitions": 30 //"Topic": { // "ReplicationFactor": 3, // "NumPartitions": 1000 @@ -130,9 +132,7 @@ "OpenDebugMode": true, "UseTableSessionPoolByDefault": false }, - "ServerTagName": "JiSheCollectBus3", - "KafkaReplicationFactor": 3, - "NumPartitions": 30, + "ServerTagName": "JiSheCollectBus3", "Cassandra": { "ReplicationStrategy": { "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下 diff --git a/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs b/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs index 444ab4e..62c63c4 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs @@ -1,6 +1,7 @@ using JiShe.CollectBus.IoTDBProvider.Context; using JiShe.CollectBus.IoTDBProvider.Interface; using JiShe.CollectBus.IoTDBProvider.Provider; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using System; using System.Collections.Generic; @@ -8,6 +9,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Volo.Abp.Modularity; +using static Thrift.Server.TThreadPoolAsyncServer; namespace JiShe.CollectBus.IoTDBProvider { @@ -15,7 +17,12 @@ namespace JiShe.CollectBus.IoTDBProvider { public override void ConfigureServices(ServiceConfigurationContext context) { - context.Services.Configure(context.Services.GetConfiguration().GetSection(nameof(IoTDBOptions))); + + var configuration = context.Services.GetConfiguration(); + Configure(options => + { + configuration.GetSection(nameof(IoTDBOptions)).Bind(options); + }); // 注册上下文为Scoped context.Services.AddScoped(); diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index 80739a6..5740c7f 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -1,4 +1,5 @@ using Confluent.Kafka; +using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Producer; using Microsoft.AspNetCore.Builder; @@ -17,14 +18,23 @@ namespace JiShe.CollectBus.Kafka public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); - var kafkaSection = configuration.GetSection("Kafka"); - KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig (); - kafkaSection.Bind(kafkaOptionConfig); - if (configuration["ServerTagName"] != null) + //var kafkaSection = configuration.GetSection(CommonConst.Kafka); + //KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig (); + //kafkaSection.Bind(kafkaOptionConfig); + //if (configuration[CommonConst.ServerTagName] != null) + //{ + // kafkaOptionConfig.ServerTagName = configuration[CommonConst.ServerTagName]!; + //} + //context.Services.AddSingleton(kafkaOptionConfig); + + context.Services.Configure(context.Services.GetConfiguration().GetSection(CommonConst.Kafka)); + + Configure(options => { - kafkaOptionConfig.ServerTagName = configuration["ServerTagName"]!; - } - context.Services.AddSingleton(kafkaOptionConfig); + configuration.GetSection(CommonConst.Kafka).Bind(options); + }); + + // 注册Producer context.Services.AddSingleton(); // 注册Consumer diff --git a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj index cef24d5..9518b0b 100644 --- a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj +++ b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj @@ -12,4 +12,8 @@ + + + + diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs index 50ae47e..28b80a5 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs @@ -19,6 +19,16 @@ namespace JiShe.CollectBus.Kafka /// public string ServerTagName { get; set; }= "KafkaFilterKey"; + /// + /// kafka主题副本数量 + /// + public int KafkaReplicationFactor { get; set; } + + /// + /// kafka主题分区数量 + /// + public int NumPartitions { get; set; } + /// /// 是否开启过滤器 /// diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 4c69b6a..d04ad4f 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -1,8 +1,10 @@ using Confluent.Kafka; +using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -95,12 +97,13 @@ namespace JiShe.CollectBus.Kafka .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) .Where(x => x.Attribute != null) .ToArray(); - + var configuration = provider.GetRequiredService(); int threadCount = 0; foreach (var sub in subscribedMethods) { - var adminClientService = provider.GetRequiredService(); - int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; + int partitionCount = configuration.GetValue(CommonConst.NumPartitions); + //var adminClientService = provider.GetRequiredService(); + //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0) partitionCount = 1; for (int i = 0; i < partitionCount; i++)