From 8fd5f985ab23cd794438dfa3ebfd554726996b0e Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Thu, 17 Apr 2025 11:42:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusKafkaModule.cs | 42 ++++++++++++++- .../Consumer/ConsumerService.cs | 43 ++++++--------- .../JiShe.CollectBus.Kafka.csproj | 4 -- .../KafkaOptionConfig.cs | 53 +++++++++++++++++++ .../KafkaSubcribesExtensions.cs | 50 +++++++---------- .../Producer/ProducerService.cs | 27 +++++----- 6 files changed, 144 insertions(+), 75 deletions(-) create mode 100644 src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index fe0e866..48a8c35 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -16,6 +16,15 @@ namespace JiShe.CollectBus.Kafka { public override void ConfigureServices(ServiceConfigurationContext context) { + var configuration = context.Services.GetConfiguration(); + var kafkaSection = configuration.GetSection("Kafka"); + KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig (); + kafkaSection.Bind(kafkaOptionConfig); + if (configuration["ServerTagName"] != null) + { + kafkaOptionConfig.ServerTagName = configuration["ServerTagName"]!; + } + context.Services.AddSingleton(kafkaOptionConfig); // 注册Producer context.Services.AddSingleton(); // 注册Consumer @@ -25,8 +34,39 @@ namespace JiShe.CollectBus.Kafka public override void OnApplicationInitialization(ApplicationInitializationContext context) { var app = context.GetApplicationBuilder(); - app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); + // 程序运行目录 + var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); + if (!string.IsNullOrWhiteSpace(assemblyPath)) + { + var dllFiles = Directory.GetFiles(assemblyPath, "*.dll"); + var kafkaSubscriberAssemblies = new List(); + foreach (var file in dllFiles) + { + try + { + // 跳过已加载的程序集 + var assemblyName = AssemblyName.GetAssemblyName(file); + var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() + .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); + var assembly = existingAssembly ?? Assembly.LoadFrom(file); + // 检查程序集是否包含 IKafkaSubscribe 的实现类 + var hasSubscriber = assembly.GetTypes() + .Any(type => + typeof(IKafkaSubscribe).IsAssignableFrom(type) && // 实现接口 + !type.IsAbstract && !type.IsInterface); // 排除抽象类和接口本身 + + if (hasSubscriber) + { + kafkaSubscriberAssemblies.Add(assembly); + } + } + catch{} + app.UseKafkaSubscribers(kafkaSubscriberAssemblies.ToArray()); + } + } + // 获取程序集 + //app.UseKafkaSubscribers(new[] { Assembly.Load("JiShe.CollectBus.Application")}); } } } diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index da9258c..eeb5661 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -1,16 +1,7 @@ using Confluent.Kafka; using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using JiShe.CollectBus.Kafka.Attributes; -using Volo.Abp.DependencyInjection; -using JiShe.CollectBus.Kafka.AdminClient; -using static Confluent.Kafka.ConfigPropertyNames; using System.Collections.Concurrent; -using System.Text.RegularExpressions; -using NUglify.Html; -using Serilog; -using System; using System.Text; namespace JiShe.CollectBus.Kafka.Consumer @@ -21,12 +12,14 @@ namespace JiShe.CollectBus.Kafka.Consumer private readonly IConfiguration _configuration; private readonly ConcurrentDictionary _consumerStore = new(); + private readonly KafkaOptionConfig _kafkaOptionConfig; private class KafkaConsumer where TKey : notnull where TValue : class { } - public ConsumerService(IConfiguration configuration, ILogger logger) + public ConsumerService(IConfiguration configuration, ILogger logger, KafkaOptionConfig kafkaOptionConfig) { _configuration = configuration; _logger = logger; + _kafkaOptionConfig = kafkaOptionConfig; } #region private 私有方法 @@ -49,11 +42,9 @@ namespace JiShe.CollectBus.Kafka.Consumer private ConsumerConfig BuildConsumerConfig(string? groupId = null) { - var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!); - var config = new ConsumerConfig { - BootstrapServers = _configuration["Kafka:BootstrapServers"], + BootstrapServers = _kafkaOptionConfig.BootstrapServers, GroupId = groupId ?? "default", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit @@ -62,12 +53,12 @@ namespace JiShe.CollectBus.Kafka.Consumer FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB) }; - if (enableAuth) + if (_kafkaOptionConfig.EnableAuthorization) { - config.SecurityProtocol = SecurityProtocol.SaslPlaintext; - config.SaslMechanism = SaslMechanism.Plain; - config.SaslUsername = _configuration["Kafka:SaslUserName"]; - config.SaslPassword = _configuration["Kafka:SaslPassword"]; + config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol; + config.SaslMechanism = _kafkaOptionConfig.SaslMechanism; + config.SaslUsername = _kafkaOptionConfig.SaslUserName; + config.SaslPassword = _kafkaOptionConfig.SaslPassword; } return config; @@ -140,9 +131,9 @@ namespace JiShe.CollectBus.Kafka.Consumer await Task.Delay(TimeSpan.FromSeconds(1),cts.Token); continue; } - if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) + if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { @@ -208,9 +199,9 @@ namespace JiShe.CollectBus.Kafka.Consumer await Task.Delay(100, cts.Token); continue; } - if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) + if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { @@ -296,9 +287,9 @@ namespace JiShe.CollectBus.Kafka.Consumer } else if (result.Message.Value != null) { - if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) + if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { @@ -430,9 +421,9 @@ namespace JiShe.CollectBus.Kafka.Consumer } else if (result.Message.Value != null) { - if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) + if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { diff --git a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj index 9518b0b..cef24d5 100644 --- a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj +++ b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj @@ -12,8 +12,4 @@ - - - - diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs new file mode 100644 index 0000000..50ae47e --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs @@ -0,0 +1,53 @@ +using Confluent.Kafka; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka +{ + public class KafkaOptionConfig + { + /// + /// kafka地址 + /// + public string BootstrapServers { get; set; } = null!; + + /// + /// 服务器标识 + /// + public string ServerTagName { get; set; }= "KafkaFilterKey"; + + /// + /// 是否开启过滤器 + /// + public bool EnableFilter { get; set; }= true; + + /// + /// 是否开启认证 + /// + public bool EnableAuthorization { get; set; } = false; + + /// + /// 安全协议 + /// + public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.SaslPlaintext; + + /// + /// 认证方式 + /// + public SaslMechanism SaslMechanism { get; set; }= SaslMechanism.Plain; + + /// + /// 用户名 + /// + public string? SaslUserName { get; set; } + + /// + /// 密码 + /// + public string? SaslPassword { get; set; } + + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 8860061..1f83540 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -1,7 +1,4 @@ using Confluent.Kafka; -using DeviceDetectorNET; -using JiShe.CollectBus.Common.Enums; -using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; @@ -9,16 +6,7 @@ using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Primitives; -using Newtonsoft.Json; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; using System.Reflection; -using System.Text; -using System.Threading.Tasks; -using static Confluent.Kafka.ConfigPropertyNames; namespace JiShe.CollectBus.Kafka { @@ -29,14 +17,8 @@ namespace JiShe.CollectBus.Kafka /// /// /// - public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly) + public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly[] assemblys) { - var subscribeTypes = assembly.GetTypes() - .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) - .ToList(); - - if (subscribeTypes.Count == 0) return; - var provider = app.ApplicationServices; var lifetime = provider.GetRequiredService(); @@ -45,18 +27,26 @@ namespace JiShe.CollectBus.Kafka var logger = provider.GetRequiredService>(); int threadCount = 0; int topicCount = 0; - foreach (var subscribeType in subscribeTypes) + foreach (Assembly assembly in assemblys) { - var subscribes = provider.GetServices(subscribeType).ToList(); - subscribes.ForEach(subscribe => { - - if(subscribe is IKafkaSubscribe) - { - Tuple tuple= BuildKafkaSubscriber(subscribe, provider, logger); - threadCount+= tuple.Item1; - topicCount+= tuple.Item2; - } - }); + var subscribeTypes = assembly.GetTypes() + .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) + .ToList(); + + if (subscribeTypes.Count == 0) return; + foreach (var subscribeType in subscribeTypes) + { + var subscribes = provider.GetServices(subscribeType).ToList(); + subscribes.ForEach(subscribe => { + + if (subscribe is IKafkaSubscribe) + { + Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger); + threadCount += tuple.Item1; + topicCount += tuple.Item2; + } + }); + } } logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); }); diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index 0ddf36b..27702e0 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -19,11 +19,12 @@ namespace JiShe.CollectBus.Kafka.Producer private readonly IConfiguration _configuration; private readonly ConcurrentDictionary _producerCache = new(); private class KafkaProducer where TKey : notnull where TValue : class { } - - public ProducerService(IConfiguration configuration,ILogger logger) + private readonly KafkaOptionConfig _kafkaOptionConfig; + public ProducerService(IConfiguration configuration,ILogger logger, KafkaOptionConfig kafkaOptionConfig) { _configuration = configuration; _logger = logger; + _kafkaOptionConfig = kafkaOptionConfig; } #region private 私有方法 @@ -51,11 +52,9 @@ namespace JiShe.CollectBus.Kafka.Producer /// private ProducerConfig BuildProducerConfig() { - var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!); - var config = new ProducerConfig { - BootstrapServers = _configuration["Kafka:BootstrapServers"], + BootstrapServers = _kafkaOptionConfig.BootstrapServers, AllowAutoCreateTopics = true, QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd @@ -66,12 +65,12 @@ namespace JiShe.CollectBus.Kafka.Producer MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs }; - if (enableAuth) + if (_kafkaOptionConfig.EnableAuthorization) { - config.SecurityProtocol = SecurityProtocol.SaslPlaintext; - config.SaslMechanism = SaslMechanism.Plain; - config.SaslUsername = _configuration["Kafka:SaslUserName"]; - config.SaslPassword = _configuration["Kafka:SaslPassword"]; + config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol; + config.SaslMechanism = _kafkaOptionConfig.SaslMechanism; + config.SaslUsername = _kafkaOptionConfig.SaslUserName; + config.SaslPassword = _kafkaOptionConfig.SaslPassword; } return config; @@ -110,7 +109,7 @@ namespace JiShe.CollectBus.Kafka.Producer Key = key, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } + { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } }; await producer.ProduceAsync(topic, message); @@ -131,7 +130,7 @@ namespace JiShe.CollectBus.Kafka.Producer { Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } + { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } }; await producer.ProduceAsync(topic, message); @@ -155,7 +154,7 @@ namespace JiShe.CollectBus.Kafka.Producer Key = key, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } + { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } }; var typeKey = typeof(KafkaProducer); @@ -189,7 +188,7 @@ namespace JiShe.CollectBus.Kafka.Producer { Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } + { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } } }; var typeKey = typeof(KafkaProducer);