From 0ba64a4b903d54c3660f6ad4bb835233dd757696 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Thu, 17 Apr 2025 14:39:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8F=91=E5=B8=83=E8=AE=A2?= =?UTF-8?q?=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EnergySystem/EnergySystemAppService.cs | 1 + .../Plugins/TcpMonitor.cs | 1 + .../Samples/SampleAppService.cs | 2 +- .../Subscribers/SubscriberAppService.cs | 1 + .../Subscribers/WorkerSubscriberAppService.cs | 1 + .../Consts}/ProtocolConst.cs | 2 +- .../Extensions/ProtocolConstExtensions.cs | 5 ++- .../CollectBusHostModule.Configure.cs | 40 ------------------- .../Consumer/ConsumerService.cs | 2 +- .../KafkaSubcribesExtensions.cs | 39 ++++++++++++++---- .../Abstracts/BaseProtocolPlugin.cs | 1 + ...JiShe.CollectBus.Protocol.Contracts.csproj | 4 ++ 12 files changed, 47 insertions(+), 52 deletions(-) rename src/{JiShe.CollectBus.Protocol.Contracts => JiShe.CollectBus.Common/Consts}/ProtocolConst.cs (99%) rename src/{JiShe.CollectBus.Protocol.Contracts => JiShe.CollectBus.Common}/Extensions/ProtocolConstExtensions.cs (95%) diff --git a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs index 1ca731b..9fc8dd4 100644 --- a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs +++ b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using DeviceDetectorNET.Class.Device; using DotNetCore.CAP; using JiShe.CollectBus.Common.BuildSendDatas; +using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 6cbc8c4..c2bd026 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; +using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 15b094a..09bd962 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -265,7 +265,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS return aa == null; } - [KafkaSubscribe("test-topic1")] + //[KafkaSubscribe("test-topic1")] public async Task KafkaSubscribeAsync(object obj) { diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 3ec6936..ce41db1 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -1,4 +1,5 @@ using DotNetCore.CAP; +using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 47cff47..65ecc01 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; using DotNetCore.CAP; +using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; diff --git a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs b/src/JiShe.CollectBus.Common/Consts/ProtocolConst.cs similarity index 99% rename from src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs rename to src/JiShe.CollectBus.Common/Consts/ProtocolConst.cs index d1e5739..df96c29 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs +++ b/src/JiShe.CollectBus.Common/Consts/ProtocolConst.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Protocol.Contracts +namespace JiShe.CollectBus.Common.Consts { public class ProtocolConst { diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/ProtocolConstExtensions.cs similarity index 95% rename from src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs rename to src/JiShe.CollectBus.Common/Extensions/ProtocolConstExtensions.cs index fc52f31..1dbb301 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/ProtocolConstExtensions.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using System; using System.Collections.Generic; @@ -7,7 +8,7 @@ using System.Reflection; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Protocol.Contracts +namespace JiShe.CollectBus.Common.Extensions { public class ProtocolConstExtensions { diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 35bb46b..82cc214 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -361,45 +361,5 @@ namespace JiShe.CollectBus.Host }); }); } - - /// - /// 配置Kafka主题 - /// - /// - /// - public void ConfigureKafkaTopic(ServiceConfigurationContext context, IConfiguration configuration) - { - var adminClient = new AdminClientBuilder(new AdminClientConfig - { - BootstrapServers = configuration.GetConnectionString(CommonConst.Kafka) - }).Build(); - - try - { - - List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); - topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); - - List topicSpecifications = new List(); - foreach (var item in topics) - { - topicSpecifications.Add(new TopicSpecification - { - Name = item, - NumPartitions = 3, - ReplicationFactor = 1 - }); - } - adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult(); - } - catch (CreateTopicsException e) - { - if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) - { - throw; - } - } - - } } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index 24f2029..fd11df4 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -50,7 +50,7 @@ namespace JiShe.CollectBus.Kafka.Consumer AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit EnablePartitionEof = true, // 启用分区末尾标记 - AllowAutoCreateTopics= true, // 启用自动创建 + //AllowAutoCreateTopics= true, // 启用自动创建 FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB) }; diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index e830dc4..c28c82c 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -1,5 +1,6 @@ using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; @@ -8,6 +9,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using System.Reflection; namespace JiShe.CollectBus.Kafka @@ -23,6 +25,18 @@ namespace JiShe.CollectBus.Kafka { var lifetime = provider.GetRequiredService(); + //初始化主题信息 + var kafkaAdminClient = provider.GetRequiredService(); + var kafkaOptions = provider.GetRequiredService>(); + + List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); + + foreach (var item in topics) + { + kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); + } + lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); @@ -55,7 +69,7 @@ namespace JiShe.CollectBus.Kafka { if (subscribe!=null) { - Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger); + Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } @@ -70,6 +84,17 @@ namespace JiShe.CollectBus.Kafka { var provider = app.ApplicationServices; var lifetime = provider.GetRequiredService(); + //初始化主题信息 + var kafkaAdminClient = provider.GetRequiredService(); + var kafkaOptions = provider.GetRequiredService>(); + + List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); + + foreach (var item in topics) + { + kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); + } lifetime.ApplicationStarted.Register(() => { @@ -88,7 +113,7 @@ namespace JiShe.CollectBus.Kafka if (subscribe != null) { - Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger); + Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } @@ -103,17 +128,17 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static Tuple BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger logger) + private static Tuple BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) .Where(x => x.Attribute != null) .ToArray(); - var configuration = provider.GetRequiredService(); + //var configuration = provider.GetRequiredService(); int threadCount = 0; foreach (var sub in subscribedMethods) { - int partitionCount = configuration.GetValue(CommonConst.NumPartitions); + int partitionCount = kafkaOptionConfig.NumPartitions; //var adminClientService = provider.GetRequiredService(); //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0) @@ -154,7 +179,7 @@ namespace JiShe.CollectBus.Kafka logger.LogError($"kafka批量消费异常:{ex.Message}"); } return await Task.FromResult(false); - }); + }, attr.GroupId, attr.BatchSize,attr.BatchTimeout); } else { @@ -171,7 +196,7 @@ namespace JiShe.CollectBus.Kafka logger.LogError($"kafka消费异常:{ex.Message}"); } return await Task.FromResult(false); - }); + }, attr.GroupId); } } diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 52ef129..bc066fe 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -13,6 +13,7 @@ using JiShe.CollectBus.IotSystems.Protocols; using MassTransit; using DotNetCore.CAP; using JiShe.CollectBus.Kafka.Producer; +using JiShe.CollectBus.Common.Consts; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts { diff --git a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj index 3aa7a77..fc0f12e 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj +++ b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj @@ -20,4 +20,8 @@ + + + +