Compare commits

..

No commits in common. "fa18377f107288f31731531a19fbd6d62ccda845" and "c97634e474001579ce478386eb7918fa7b800b23" have entirely different histories.

8 changed files with 76 additions and 178 deletions

View File

@ -86,8 +86,8 @@
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"EnableFilter": true,
"EnableAuthorization": false,
"SecurityProtocol": "SaslPlaintext",
"SaslMechanism": "Plain",
"SecurityProtocol": "SASL_PLAINTEXT",
"SaslMechanism": "PLAIN",
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980"
//"Topic": {

View File

@ -16,15 +16,6 @@ namespace JiShe.CollectBus.Kafka
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
var kafkaSection = configuration.GetSection("Kafka");
KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig ();
kafkaSection.Bind(kafkaOptionConfig);
if (configuration["ServerTagName"] != null)
{
kafkaOptionConfig.ServerTagName = configuration["ServerTagName"]!;
}
context.Services.AddSingleton(kafkaOptionConfig);
// 注册Producer
context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer
@ -34,39 +25,8 @@ namespace JiShe.CollectBus.Kafka
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var app = context.GetApplicationBuilder();
// 程序运行目录
//var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
//if (!string.IsNullOrWhiteSpace(assemblyPath))
//{
// var dllFiles = Directory.GetFiles(assemblyPath, "*.dll");
// var kafkaSubscriberAssemblies = new List<Assembly>();
// foreach (var file in dllFiles)
// {
// try
// {
// // 跳过已加载的程序集
// var assemblyName = AssemblyName.GetAssemblyName(file);
// var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
// .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
// var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// // 检查程序集是否包含 IKafkaSubscribe 的实现类
// var hasSubscriber = assembly.GetTypes()
// .Any(type =>
// typeof(IKafkaSubscribe).IsAssignableFrom(type) && // 实现接口
// !type.IsAbstract && !type.IsInterface); // 排除抽象类和接口本身
// if (hasSubscriber)
// {
// kafkaSubscriberAssemblies.Add(assembly);
// }
// }
// catch{}
// app.UseKafkaSubscribers(kafkaSubscriberAssemblies.ToArray());
// }
//}
// 获取程序集
app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
}
}
}

View File

@ -1,7 +1,16 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Kafka.Attributes;
using Volo.Abp.DependencyInjection;
using JiShe.CollectBus.Kafka.AdminClient;
using static Confluent.Kafka.ConfigPropertyNames;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using NUglify.Html;
using Serilog;
using System;
using System.Text;
namespace JiShe.CollectBus.Kafka.Consumer
@ -12,14 +21,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Type, (object Consumer, CancellationTokenSource CTS)>
_consumerStore = new();
private readonly KafkaOptionConfig _kafkaOptionConfig;
private class KafkaConsumer<TKey, TValue> where TKey : notnull where TValue : class { }
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger, KafkaOptionConfig kafkaOptionConfig)
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
{
_configuration = configuration;
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig;
}
#region private
@ -42,9 +49,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
private ConsumerConfig BuildConsumerConfig(string? groupId = null)
{
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
var config = new ConsumerConfig
{
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
BootstrapServers = _configuration["Kafka:BootstrapServers"],
GroupId = groupId ?? "default",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
@ -53,12 +62,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
};
if (_kafkaOptionConfig.EnableAuthorization)
if (enableAuth)
{
config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol;
config.SaslMechanism = _kafkaOptionConfig.SaslMechanism;
config.SaslUsername = _kafkaOptionConfig.SaslUserName;
config.SaslPassword = _kafkaOptionConfig.SaslPassword;
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
config.SaslMechanism = SaslMechanism.Plain;
config.SaslUsername = _configuration["Kafka:SaslUserName"];
config.SaslPassword = _configuration["Kafka:SaslPassword"];
}
return config;
@ -131,9 +140,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
@ -199,9 +208,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.Delay(100, cts.Token);
continue;
}
if (_kafkaOptionConfig.EnableFilter)
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
@ -287,9 +296,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
@ -421,9 +430,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
}
else if (result.Message.Value != null)
{
if (_kafkaOptionConfig.EnableFilter)
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{

View File

@ -12,4 +12,8 @@
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -1,53 +0,0 @@
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public class KafkaOptionConfig
{
/// <summary>
/// kafka地址
/// </summary>
public string BootstrapServers { get; set; } = null!;
/// <summary>
/// 服务器标识
/// </summary>
public string ServerTagName { get; set; }= "KafkaFilterKey";
/// <summary>
/// 是否开启过滤器
/// </summary>
public bool EnableFilter { get; set; }= true;
/// <summary>
/// 是否开启认证
/// </summary>
public bool EnableAuthorization { get; set; } = false;
/// <summary>
/// 安全协议
/// </summary>
public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.SaslPlaintext;
/// <summary>
/// 认证方式
/// </summary>
public SaslMechanism SaslMechanism { get; set; }= SaslMechanism.Plain;
/// <summary>
/// 用户名
/// </summary>
public string? SaslUserName { get; set; }
/// <summary>
/// 密码
/// </summary>
public string? SaslPassword { get; set; }
}
}

View File

@ -1,4 +1,7 @@
using Confluent.Kafka;
using DeviceDetectorNET;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer;
@ -6,54 +9,34 @@ using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka
{
public static class KafkaSubcribesExtensions
{
///// <summary>
///// 添加Kafka订阅
///// </summary>
///// <param name="app"></param>
///// <param name="assembly"></param>
//public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly[] assemblys)
//{
// var provider = app.ApplicationServices;
// var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
// lifetime.ApplicationStarted.Register(() =>
// {
// var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
// int threadCount = 0;
// int topicCount = 0;
// foreach (Assembly assembly in assemblys)
// {
// var subscribeTypes = assembly.GetTypes()
// .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
// .ToList();
// if (subscribeTypes.Count == 0) return;
// foreach (var subscribeType in subscribeTypes)
// {
// var subscribes = provider.GetServices(subscribeType).ToList();
// subscribes.ForEach(subscribe => {
// if (subscribe is IKafkaSubscribe)
// {
// Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger);
// threadCount += tuple.Item1;
// topicCount += tuple.Item2;
// }
// });
// }
// }
// logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
// });
//}
/// <summary>
/// 添加Kafka订阅
/// </summary>
/// <param name="app"></param>
/// <param name="assembly"></param>
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
{
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
if (subscribeTypes.Count == 0) return;
var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
@ -62,11 +45,6 @@ namespace JiShe.CollectBus.Kafka
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
if (subscribeTypes.Count == 0) return;
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();

View File

@ -19,12 +19,11 @@ namespace JiShe.CollectBus.Kafka.Producer
private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary<Type, object> _producerCache = new();
private class KafkaProducer<TKey, TValue> where TKey : notnull where TValue : class { }
private readonly KafkaOptionConfig _kafkaOptionConfig;
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger, KafkaOptionConfig kafkaOptionConfig)
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger)
{
_configuration = configuration;
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig;
}
#region private
@ -52,9 +51,11 @@ namespace JiShe.CollectBus.Kafka.Producer
/// <returns></returns>
private ProducerConfig BuildProducerConfig()
{
var enableAuth = bool.Parse(_configuration["Kafka:EnableAuthorization"]!);
var config = new ProducerConfig
{
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
BootstrapServers = _configuration["Kafka:BootstrapServers"],
AllowAutoCreateTopics = true,
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
@ -65,12 +66,12 @@ namespace JiShe.CollectBus.Kafka.Producer
MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs
};
if (_kafkaOptionConfig.EnableAuthorization)
if (enableAuth)
{
config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol;
config.SaslMechanism = _kafkaOptionConfig.SaslMechanism;
config.SaslUsername = _kafkaOptionConfig.SaslUserName;
config.SaslPassword = _kafkaOptionConfig.SaslPassword;
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
config.SaslMechanism = SaslMechanism.Plain;
config.SaslUsername = _configuration["Kafka:SaslUserName"];
config.SaslPassword = _configuration["Kafka:SaslPassword"];
}
return config;
@ -109,7 +110,7 @@ namespace JiShe.CollectBus.Kafka.Producer
Key = key,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
}
};
await producer.ProduceAsync(topic, message);
@ -130,7 +131,7 @@ namespace JiShe.CollectBus.Kafka.Producer
{
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
}
};
await producer.ProduceAsync(topic, message);
@ -154,7 +155,7 @@ namespace JiShe.CollectBus.Kafka.Producer
Key = key,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
}
};
var typeKey = typeof(KafkaProducer<TKey, TValue>);
@ -188,7 +189,7 @@ namespace JiShe.CollectBus.Kafka.Producer
{
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
{ "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) }
}
};
var typeKey = typeof(KafkaProducer<string, TValue>);

View File

@ -12,7 +12,6 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.Protocols;
using MassTransit;
using DotNetCore.CAP;
using JiShe.CollectBus.Kafka.Producer;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{