dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
6 changed files with 144 additions and 75 deletions
Showing only changes of commit 8fd5f985ab - Show all commits

View File

@ -16,6 +16,15 @@ namespace JiShe.CollectBus.Kafka
{ {
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
{ {
var configuration = context.Services.GetConfiguration();
var kafkaSection = configuration.GetSection("Kafka");
KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig ();
kafkaSection.Bind(kafkaOptionConfig);
if (configuration["ServerTagName"] != null)
{
kafkaOptionConfig.ServerTagName = configuration["ServerTagName"]!;
}
context.Services.AddSingleton(kafkaOptionConfig);
// 注册Producer // 注册Producer
context.Services.AddSingleton<IProducerService, ProducerService>(); context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer // 注册Consumer
@ -25,8 +34,39 @@ namespace JiShe.CollectBus.Kafka
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override void OnApplicationInitialization(ApplicationInitializationContext context)
{ {
var app = context.GetApplicationBuilder(); var app = context.GetApplicationBuilder();
app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); // 程序运行目录
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
if (!string.IsNullOrWhiteSpace(assemblyPath))
{
var dllFiles = Directory.GetFiles(assemblyPath, "*.dll");
var kafkaSubscriberAssemblies = new List<Assembly>();
foreach (var file in dllFiles)
{
try
{
// 跳过已加载的程序集
var assemblyName = AssemblyName.GetAssemblyName(file);
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 检查程序集是否包含 IKafkaSubscribe 的实现类
var hasSubscriber = assembly.GetTypes()
.Any(type =>
typeof(IKafkaSubscribe).IsAssignableFrom(type) && // 实现接口
!type.IsAbstract && !type.IsInterface); // 排除抽象类和接口本身
if (hasSubscriber)
{
kafkaSubscriberAssemblies.Add(assembly);
}
}
catch{}
app.UseKafkaSubscribers(kafkaSubscriberAssemblies.ToArray());
}
}
// 获取程序集
//app.UseKafkaSubscribers(new[] { Assembly.Load("JiShe.CollectBus.Application")});
} }
} }
} }

View File

@ -1,16 +1,7 @@
using Confluent.Kafka; using Confluent.Kafka;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Kafka.Attributes;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Kafka.AdminClient;
using static Confluent.Kafka.ConfigPropertyNames;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using NUglify.Html;
using Serilog;
using System;
using System.Text; using System.Text;
namespace JiShe.CollectBus.Kafka.Consumer namespace JiShe.CollectBus.Kafka.Consumer
@ -21,12 +12,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly IConfiguration _configuration; private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)> private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
_consumerStore = new(); _consumerStore = new();
private readonly KafkaOptionConfig _kafkaOptionConfig;
private class KafkaConsumer<TKey, TValue> where TKey : notnull where TValue : class { } private class KafkaConsumer<TKey, TValue> where TKey : notnull where TValue : class { }
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger) public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger, KafkaOptionConfig kafkaOptionConfig)
{ {
_configuration = configuration; _configuration = configuration;
_logger = logger; _logger = logger;
_kafkaOptionConfig = kafkaOptionConfig;
} }
#region private #region private
@ -49,11 +42,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
private ConsumerConfig BuildConsumerConfig(string? groupId = null) private ConsumerConfig BuildConsumerConfig(string? groupId = null)
{ {
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
var config = new ConsumerConfig var config = new ConsumerConfig
{ {
BootstrapServers = _configuration["Kafka:BootstrapServers"], BootstrapServers = _kafkaOptionConfig.BootstrapServers,
GroupId = groupId ?? "default", GroupId = groupId ?? "default",
AutoOffsetReset = AutoOffsetReset.Earliest, AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit EnableAutoCommit = false, // 禁止AutoCommit
@ -62,12 +53,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
}; };
if (enableAuth) if (_kafkaOptionConfig.EnableAuthorization)
{ {
config.SecurityProtocol = SecurityProtocol.SaslPlaintext; config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol;
config.SaslMechanism = SaslMechanism.Plain; config.SaslMechanism = _kafkaOptionConfig.SaslMechanism;
config.SaslUsername = _configuration["Kafka:SaslUserName"]; config.SaslUsername = _kafkaOptionConfig.SaslUserName;
config.SaslPassword = _configuration["Kafka:SaslPassword"]; config.SaslPassword = _kafkaOptionConfig.SaslPassword;
} }
return config; return config;
@ -140,9 +131,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.Delay(TimeSpan.FromSeconds(1),cts.Token); await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
continue; continue;
} }
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) if (_kafkaOptionConfig.EnableFilter)
{ {
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件 // 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers)) if (!headersFilter.Match(result.Message.Headers))
{ {
@ -208,9 +199,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.Delay(100, cts.Token); await Task.Delay(100, cts.Token);
continue; continue;
} }
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) if (_kafkaOptionConfig.EnableFilter)
{ {
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件 // 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers)) if (!headersFilter.Match(result.Message.Headers))
{ {
@ -296,9 +287,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
} }
else if (result.Message.Value != null) else if (result.Message.Value != null)
{ {
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) if (_kafkaOptionConfig.EnableFilter)
{ {
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件 // 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers)) if (!headersFilter.Match(result.Message.Headers))
{ {
@ -430,9 +421,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
} }
else if (result.Message.Value != null) else if (result.Message.Value != null)
{ {
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) if (_kafkaOptionConfig.EnableFilter)
{ {
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
// 检查 Header 是否符合条件 // 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers)) if (!headersFilter.Match(result.Message.Headers))
{ {

View File

@ -12,8 +12,4 @@
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project> </Project>

View File

@ -0,0 +1,53 @@
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public class KafkaOptionConfig
{
/// <summary>
/// kafka地址
/// </summary>
public string BootstrapServers { get; set; } = null!;
/// <summary>
/// 服务器标识
/// </summary>
public string ServerTagName { get; set; }= "KafkaFilterKey";
/// <summary>
/// 是否开启过滤器
/// </summary>
public bool EnableFilter { get; set; }= true;
/// <summary>
/// 是否开启认证
/// </summary>
public bool EnableAuthorization { get; set; } = false;
/// <summary>
/// 安全协议
/// </summary>
public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.SaslPlaintext;
/// <summary>
/// 认证方式
/// </summary>
public SaslMechanism SaslMechanism { get; set; }= SaslMechanism.Plain;
/// <summary>
/// 用户名
/// </summary>
public string? SaslUserName { get; set; }
/// <summary>
/// 密码
/// </summary>
public string? SaslPassword { get; set; }
}
}

View File

@ -1,7 +1,4 @@
using Confluent.Kafka; using Confluent.Kafka;
using DeviceDetectorNET;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
@ -9,16 +6,7 @@ using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection; using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka
{ {
@ -29,14 +17,8 @@ namespace JiShe.CollectBus.Kafka
/// </summary> /// </summary>
/// <param name="app"></param> /// <param name="app"></param>
/// <param name="assembly"></param> /// <param name="assembly"></param>
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly) public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly[] assemblys)
{ {
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
if (subscribeTypes.Count == 0) return;
var provider = app.ApplicationServices; var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
@ -45,18 +27,26 @@ namespace JiShe.CollectBus.Kafka
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0; int threadCount = 0;
int topicCount = 0; int topicCount = 0;
foreach (var subscribeType in subscribeTypes) foreach (Assembly assembly in assemblys)
{ {
var subscribes = provider.GetServices(subscribeType).ToList(); var subscribeTypes = assembly.GetTypes()
subscribes.ForEach(subscribe => { .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
if(subscribe is IKafkaSubscribe)
{ if (subscribeTypes.Count == 0) return;
Tuple<int, int> tuple= BuildKafkaSubscriber(subscribe, provider, logger); foreach (var subscribeType in subscribeTypes)
threadCount+= tuple.Item1; {
topicCount+= tuple.Item2; var subscribes = provider.GetServices(subscribeType).ToList();
} subscribes.ForEach(subscribe => {
});
if (subscribe is IKafkaSubscribe)
{
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
});
}
} }
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
}); });

View File

@ -19,11 +19,12 @@ namespace JiShe.CollectBus.Kafka.Producer
private readonly IConfiguration _configuration; private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Type, object> _producerCache = new(); private readonly ConcurrentDictionary<Type, object> _producerCache = new();
private class KafkaProducer<TKey, TValue> where TKey : notnull where TValue : class { } private class KafkaProducer<TKey, TValue> where TKey : notnull where TValue : class { }
private readonly KafkaOptionConfig _kafkaOptionConfig;
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger) public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger, KafkaOptionConfig kafkaOptionConfig)
{ {
_configuration = configuration; _configuration = configuration;
_logger = logger; _logger = logger;
_kafkaOptionConfig = kafkaOptionConfig;
} }
#region private #region private
@ -51,11 +52,9 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns> /// <returns></returns>
private ProducerConfig BuildProducerConfig() private ProducerConfig BuildProducerConfig()
{ {
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
var config = new ProducerConfig var config = new ProducerConfig
{ {
BootstrapServers = _configuration["Kafka:BootstrapServers"], BootstrapServers = _kafkaOptionConfig.BootstrapServers,
AllowAutoCreateTopics = true, AllowAutoCreateTopics = true,
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
@ -66,12 +65,12 @@ namespace JiShe.CollectBus.Kafka.Producer
MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs
}; };
if (enableAuth) if (_kafkaOptionConfig.EnableAuthorization)
{ {
config.SecurityProtocol = SecurityProtocol.SaslPlaintext; config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol;
config.SaslMechanism = SaslMechanism.Plain; config.SaslMechanism = _kafkaOptionConfig.SaslMechanism;
config.SaslUsername = _configuration["Kafka:SaslUserName"]; config.SaslUsername = _kafkaOptionConfig.SaslUserName;
config.SaslPassword = _configuration["Kafka:SaslPassword"]; config.SaslPassword = _kafkaOptionConfig.SaslPassword;
} }
return config; return config;
@ -110,7 +109,7 @@ namespace JiShe.CollectBus.Kafka.Producer
Key = key, Key = key,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
} }
}; };
await producer.ProduceAsync(topic, message); await producer.ProduceAsync(topic, message);
@ -131,7 +130,7 @@ namespace JiShe.CollectBus.Kafka.Producer
{ {
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
} }
}; };
await producer.ProduceAsync(topic, message); await producer.ProduceAsync(topic, message);
@ -155,7 +154,7 @@ namespace JiShe.CollectBus.Kafka.Producer
Key = key, Key = key,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
} }
}; };
var typeKey = typeof(KafkaProducer<TKey, TValue>); var typeKey = typeof(KafkaProducer<TKey, TValue>);
@ -189,7 +188,7 @@ namespace JiShe.CollectBus.Kafka.Producer
{ {
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
} }
}; };
var typeKey = typeof(KafkaProducer<string, TValue>); var typeKey = typeof(KafkaProducer<string, TValue>);