Compare commits

..

No commits in common. "4c5f7231bf7dcd267ab30ac8c4c214c40630a653" and "6c8ffb3ae5429f18d925dda5b04fea80610fc7d6" have entirely different histories.

4 changed files with 88 additions and 62 deletions

View File

@ -44,12 +44,39 @@ namespace JiShe.CollectBus.Kafka
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override void OnApplicationInitialization(ApplicationInitializationContext context)
{ {
var app = context.GetApplicationBuilder(); var app = context.GetApplicationBuilder();
// 程序运行目录
// 注册Subscriber //var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
app.ApplicationServices.UseKafkaSubscribers(); //if (!string.IsNullOrWhiteSpace(assemblyPath))
//{
// var dllFiles = Directory.GetFiles(assemblyPath, "*.dll");
// var kafkaSubscriberAssemblies = new List<Assembly>();
// foreach (var file in dllFiles)
// {
// try
// {
// // 跳过已加载的程序集
// var assemblyName = AssemblyName.GetAssemblyName(file);
// var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
// .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
// var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// // 检查程序集是否包含 IKafkaSubscribe 的实现类
// var hasSubscriber = assembly.GetTypes()
// .Any(type =>
// typeof(IKafkaSubscribe).IsAssignableFrom(type) && // 实现接口
// !type.IsAbstract && !type.IsInterface); // 排除抽象类和接口本身
// if (hasSubscriber)
// {
// kafkaSubscriberAssemblies.Add(assembly);
// }
// }
// catch{}
// app.UseKafkaSubscribers(kafkaSubscriberAssemblies.ToArray());
// }
//}
// 获取程序集 // 获取程序集
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
} }
} }
} }

View File

@ -6,12 +6,6 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka
{ {
/// <summary>
/// Kafka订阅者
/// <para>
/// 订阅者需要继承此接口并需要依赖注入,并使用<see cref="KafkaSubscribeAttribute"/>标记
/// </para>
/// </summary>
public interface IKafkaSubscribe public interface IKafkaSubscribe
{ {
} }

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka
{
public class KafkaOptions
{
public string BootstrapServers { get; set; }
public string GroupId { get; set; }
public Dictionary<string, string> ProducerConfig { get; set; } = new();
public Dictionary<string, string> ConsumerConfig { get; set; } = new();
public Dictionary<string, string> AdminConfig { get; set; } = new();
}
}

View File

@ -14,57 +14,45 @@ namespace JiShe.CollectBus.Kafka
{ {
public static class KafkaSubcribesExtensions public static class KafkaSubcribesExtensions
{ {
/// <summary> ///// <summary>
/// 添加Kafka订阅 ///// 添加Kafka订阅
/// </summary> ///// </summary>
/// <param name="app"></param> ///// <param name="app"></param>
/// <param name="assembly"></param> ///// <param name="assembly"></param>
public static void UseKafkaSubscribers(this IServiceProvider provider) //public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly[] assemblys)
{ //{
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); // var provider = app.ApplicationServices;
// var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
lifetime.ApplicationStarted.Register(() => // lifetime.ApplicationStarted.Register(() =>
{ // {
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); // var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0; // int threadCount = 0;
int topicCount = 0; // int topicCount = 0;
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); // foreach (Assembly assembly in assemblys)
if (string.IsNullOrWhiteSpace(assemblyPath)) // {
{ // var subscribeTypes = assembly.GetTypes()
logger.LogInformation($"kafka订阅未能找到程序路径"); // .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
return; // .ToList();
}
var dllFiles = Directory.GetFiles(assemblyPath, "*.dll"); // if (subscribeTypes.Count == 0) return;
foreach (var file in dllFiles) // foreach (var subscribeType in subscribeTypes)
{ // {
// 跳过已加载的程序集 // var subscribes = provider.GetServices(subscribeType).ToList();
var assemblyName = AssemblyName.GetAssemblyName(file); // subscribes.ForEach(subscribe => {
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); // if (subscribe is IKafkaSubscribe)
var assembly = existingAssembly ?? Assembly.LoadFrom(file); // {
// 实现IKafkaSubscribe接口 // Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger);
var subscribeTypes = assembly.GetTypes().Where(type => // threadCount += tuple.Item1;
typeof(IKafkaSubscribe).IsAssignableFrom(type) && // topicCount += tuple.Item2;
!type.IsAbstract && !type.IsInterface).ToList(); ; // }
if (subscribeTypes.Count == 0) // });
continue; // }
foreach (var subscribeType in subscribeTypes) // }
{ // logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
var subscribes = provider.GetServices(subscribeType).ToList(); // });
subscribes.ForEach(subscribe => //}
{
if (subscribe!=null)
{
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
});
}
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
}
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly) public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
{ {
@ -86,7 +74,7 @@ namespace JiShe.CollectBus.Kafka
var subscribes = provider.GetServices(subscribeType).ToList(); var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => { subscribes.ForEach(subscribe => {
if (subscribe != null) if (subscribe is IKafkaSubscribe)
{ {
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger); Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger);
threadCount += tuple.Item1; threadCount += tuple.Item1;