移除测试日志

This commit is contained in:
zenghongyao 2025-04-18 09:28:48 +08:00
parent c93feb631d
commit cb36fc463a
3 changed files with 55 additions and 59 deletions

View File

@ -40,7 +40,7 @@ namespace JiShe.CollectBus.Kafka
// 注册Consumer
context.Services.AddSingleton<IConsumerService, ConsumerService>();
context.Services.AddHostedService<HostedService>();
//context.Services.AddHostedService<HostedService>();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
@ -48,7 +48,7 @@ namespace JiShe.CollectBus.Kafka
var app = context.GetApplicationBuilder();
// 注册Subscriber
//app.ApplicationServices.UseKafkaSubscribe();
app.ApplicationServices.UseKafkaSubscribe();
// 获取程序集
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));

View File

@ -32,8 +32,6 @@ namespace JiShe.CollectBus.Kafka
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("结束");
return Task.CompletedTask;
}

View File

@ -25,9 +25,9 @@ namespace JiShe.CollectBus.Kafka
/// </summary>
/// <param name="app"></param>
/// <param name="assembly"></param>
public static async Task UseKafkaSubscribe(this IServiceProvider provider)
public static void UseKafkaSubscribe(this IServiceProvider provider)
{
//var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
@ -40,10 +40,9 @@ namespace JiShe.CollectBus.Kafka
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
List<Task> tasks = new List<Task>();
//lifetime.ApplicationStarted.Register(async() =>
//{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
@ -62,19 +61,19 @@ namespace JiShe.CollectBus.Kafka
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 实现IKafkaSubscribe接口
var subscribeTypes = assembly.GetTypes().Where(type =>
typeof(IKafkaSubscribe).IsAssignableFrom(type) &&
typeof(IKafkaSubscribe).IsAssignableFrom(type) &&
!type.IsAbstract && !type.IsInterface).ToList(); ;
if (subscribeTypes.Count == 0)
if (subscribeTypes.Count == 0)
continue;
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(async subscribe =>
subscribes.ForEach(subscribe =>
{
if (subscribe!=null)
if (subscribe != null)
{
Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value,tasks);
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
@ -82,59 +81,58 @@ namespace JiShe.CollectBus.Kafka
}
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
//});
await Task.WhenAll(tasks);
});
}
//public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
//{
// var provider = app.ApplicationServices;
// var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
// //初始化主题信息
// var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
// var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly)
{
var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
// List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
// topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
// foreach (var item in topics)
// {
// kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
// }
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
// lifetime.ApplicationStarted.Register(async () =>
// {
// var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
// int threadCount = 0;
// int topicCount = 0;
// var subscribeTypes = assembly.GetTypes()
// .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
// .ToList();
if (subscribeTypes.Count == 0) return;
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe =>
{
// if (subscribeTypes.Count == 0) return;
// foreach (var subscribeType in subscribeTypes)
// {
// var subscribes = provider.GetServices(subscribeType).ToList();
// subscribes.ForEach(async subscribe => {
// if (subscribe != null)
// {
// Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
// threadCount += tuple.Item1;
// topicCount += tuple.Item2;
// }
// });
// }
// logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
// });
//}
if (subscribe != null)
{
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
});
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
}
/// <summary>
/// 构建Kafka订阅
/// </summary>
/// <param name="subscribe"></param>
/// <param name="provider"></param>
private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig, List<Task> tasks)
private static Tuple<int,int> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
{
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
@ -157,11 +155,11 @@ namespace JiShe.CollectBus.Kafka
for (int i = 0; i < partitionCount; i++)
{
//if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
tasks.Add( Task.Run(()=> StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)));
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
threadCount++;
}
}
return await Task.FromResult(Tuple.Create(threadCount, subscribedMethods.Length));
return Tuple.Create(threadCount, subscribedMethods.Length);
}
/// <summary>