dev #2
@ -44,39 +44,12 @@ namespace JiShe.CollectBus.Kafka
|
||||
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
||||
{
|
||||
var app = context.GetApplicationBuilder();
|
||||
// 程序运行目录
|
||||
//var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
|
||||
//if (!string.IsNullOrWhiteSpace(assemblyPath))
|
||||
//{
|
||||
// var dllFiles = Directory.GetFiles(assemblyPath, "*.dll");
|
||||
// var kafkaSubscriberAssemblies = new List<Assembly>();
|
||||
// foreach (var file in dllFiles)
|
||||
// {
|
||||
// try
|
||||
// {
|
||||
// // 跳过已加载的程序集
|
||||
// var assemblyName = AssemblyName.GetAssemblyName(file);
|
||||
// var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
|
||||
// .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
|
||||
// var assembly = existingAssembly ?? Assembly.LoadFrom(file);
|
||||
|
||||
// // 检查程序集是否包含 IKafkaSubscribe 的实现类
|
||||
// var hasSubscriber = assembly.GetTypes()
|
||||
// .Any(type =>
|
||||
// typeof(IKafkaSubscribe).IsAssignableFrom(type) && // 实现接口
|
||||
// !type.IsAbstract && !type.IsInterface); // 排除抽象类和接口本身
|
||||
// 注册Subscriber
|
||||
app.ApplicationServices.UseKafkaSubscribers();
|
||||
|
||||
// if (hasSubscriber)
|
||||
// {
|
||||
// kafkaSubscriberAssemblies.Add(assembly);
|
||||
// }
|
||||
// }
|
||||
// catch{}
|
||||
// app.UseKafkaSubscribers(kafkaSubscriberAssemblies.ToArray());
|
||||
// }
|
||||
//}
|
||||
// 获取程序集
|
||||
app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
|
||||
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,6 +6,12 @@ using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka
|
||||
{
|
||||
/// <summary>
|
||||
/// Kafka订阅者
|
||||
/// <para>
|
||||
/// 订阅者需要继承此接口并需要依赖注入,并使用<see cref="KafkaSubscribeAttribute"/>标记
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public interface IKafkaSubscribe
|
||||
{
|
||||
}
|
||||
|
||||
@ -1,17 +0,0 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka
|
||||
{
|
||||
public class KafkaOptions
|
||||
{
|
||||
public string BootstrapServers { get; set; }
|
||||
public string GroupId { get; set; }
|
||||
public Dictionary<string, string> ProducerConfig { get; set; } = new();
|
||||
public Dictionary<string, string> ConsumerConfig { get; set; } = new();
|
||||
public Dictionary<string, string> AdminConfig { get; set; } = new();
|
||||
}
|
||||
}
|
||||
@ -14,45 +14,57 @@ namespace JiShe.CollectBus.Kafka
|
||||
{
|
||||
public static class KafkaSubcribesExtensions
|
||||
{
|
||||
///// <summary>
|
||||
///// 添加Kafka订阅
|
||||
///// </summary>
|
||||
///// <param name="app"></param>
|
||||
///// <param name="assembly"></param>
|
||||
//public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly[] assemblys)
|
||||
//{
|
||||
// var provider = app.ApplicationServices;
|
||||
// var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
||||
/// <summary>
|
||||
/// 添加Kafka订阅
|
||||
/// </summary>
|
||||
/// <param name="app"></param>
|
||||
/// <param name="assembly"></param>
|
||||
public static void UseKafkaSubscribers(this IServiceProvider provider)
|
||||
{
|
||||
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
||||
|
||||
// lifetime.ApplicationStarted.Register(() =>
|
||||
// {
|
||||
// var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
|
||||
// int threadCount = 0;
|
||||
// int topicCount = 0;
|
||||
// foreach (Assembly assembly in assemblys)
|
||||
// {
|
||||
// var subscribeTypes = assembly.GetTypes()
|
||||
// .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
|
||||
// .ToList();
|
||||
|
||||
// if (subscribeTypes.Count == 0) return;
|
||||
// foreach (var subscribeType in subscribeTypes)
|
||||
// {
|
||||
// var subscribes = provider.GetServices(subscribeType).ToList();
|
||||
// subscribes.ForEach(subscribe => {
|
||||
|
||||
// if (subscribe is IKafkaSubscribe)
|
||||
// {
|
||||
// Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger);
|
||||
// threadCount += tuple.Item1;
|
||||
// topicCount += tuple.Item2;
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
// logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
||||
// });
|
||||
//}
|
||||
lifetime.ApplicationStarted.Register(() =>
|
||||
{
|
||||
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
|
||||
int threadCount = 0;
|
||||
int topicCount = 0;
|
||||
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
|
||||
if (string.IsNullOrWhiteSpace(assemblyPath))
|
||||
{
|
||||
logger.LogInformation($"kafka订阅未能找到程序路径");
|
||||
return;
|
||||
}
|
||||
var dllFiles = Directory.GetFiles(assemblyPath, "*.dll");
|
||||
foreach (var file in dllFiles)
|
||||
{
|
||||
// 跳过已加载的程序集
|
||||
var assemblyName = AssemblyName.GetAssemblyName(file);
|
||||
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
|
||||
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
|
||||
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
|
||||
// 实现IKafkaSubscribe接口
|
||||
var subscribeTypes = assembly.GetTypes().Where(type =>
|
||||
typeof(IKafkaSubscribe).IsAssignableFrom(type) &&
|
||||
!type.IsAbstract && !type.IsInterface).ToList(); ;
|
||||
if (subscribeTypes.Count == 0)
|
||||
continue;
|
||||
foreach (var subscribeType in subscribeTypes)
|
||||
{
|
||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
||||
subscribes.ForEach(subscribe =>
|
||||
{
|
||||
if (subscribe!=null)
|
||||
{
|
||||
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger);
|
||||
threadCount += tuple.Item1;
|
||||
topicCount += tuple.Item2;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
||||
});
|
||||
}
|
||||
|
||||
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
|
||||
{
|
||||
@ -74,7 +86,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
||||
subscribes.ForEach(subscribe => {
|
||||
|
||||
if (subscribe is IKafkaSubscribe)
|
||||
if (subscribe != null)
|
||||
{
|
||||
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger);
|
||||
threadCount += tuple.Item1;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user