Compare commits

..

No commits in common. "17b73b6a2ab3302c114986a8a7d8daad39b1e7ca" and "7704c6374ed9f6a3989d2a96dde8c2d1d4cdf217" have entirely different histories.

3 changed files with 59 additions and 55 deletions

View File

@ -40,7 +40,7 @@ namespace JiShe.CollectBus.Kafka
// 注册Consumer // 注册Consumer
context.Services.AddSingleton<IConsumerService, ConsumerService>(); context.Services.AddSingleton<IConsumerService, ConsumerService>();
//context.Services.AddHostedService<HostedService>(); context.Services.AddHostedService<HostedService>();
} }
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override void OnApplicationInitialization(ApplicationInitializationContext context)
@ -48,7 +48,7 @@ namespace JiShe.CollectBus.Kafka
var app = context.GetApplicationBuilder(); var app = context.GetApplicationBuilder();
// 注册Subscriber // 注册Subscriber
app.ApplicationServices.UseKafkaSubscribe(); //app.ApplicationServices.UseKafkaSubscribe();
// 获取程序集 // 获取程序集
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); //app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));

View File

@ -32,6 +32,8 @@ namespace JiShe.CollectBus.Kafka
public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)
{ {
_logger.LogInformation("结束"); _logger.LogInformation("结束");
return Task.CompletedTask; return Task.CompletedTask;
} }

View File

@ -25,9 +25,9 @@ namespace JiShe.CollectBus.Kafka
/// </summary> /// </summary>
/// <param name="app"></param> /// <param name="app"></param>
/// <param name="assembly"></param> /// <param name="assembly"></param>
public static void UseKafkaSubscribe(this IServiceProvider provider) public static async Task UseKafkaSubscribe(this IServiceProvider provider)
{ {
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); //var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息 //初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>(); var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
@ -40,8 +40,9 @@ namespace JiShe.CollectBus.Kafka
{ {
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
} }
lifetime.ApplicationStarted.Register(() => List<Task> tasks = new List<Task>();
{ //lifetime.ApplicationStarted.Register(async() =>
//{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0; int threadCount = 0;
int topicCount = 0; int topicCount = 0;
@ -69,11 +70,11 @@ namespace JiShe.CollectBus.Kafka
foreach (var subscribeType in subscribeTypes) foreach (var subscribeType in subscribeTypes)
{ {
var subscribes = provider.GetServices(subscribeType).ToList(); var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => subscribes.ForEach(async subscribe =>
{ {
if (subscribe!=null) if (subscribe!=null)
{ {
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value,tasks);
threadCount += tuple.Item1; threadCount += tuple.Item1;
topicCount += tuple.Item2; topicCount += tuple.Item2;
} }
@ -81,58 +82,59 @@ namespace JiShe.CollectBus.Kafka
} }
} }
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
}); //});
await Task.WhenAll(tasks);
} }
public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly) //public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
{ //{
var provider = app.ApplicationServices; // var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); // var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息 // //初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>(); // var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>(); // var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); // List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); // topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics) // foreach (var item in topics)
{ // {
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); // kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
} // }
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
if (subscribeTypes.Count == 0) return; // lifetime.ApplicationStarted.Register(async () =>
foreach (var subscribeType in subscribeTypes) // {
{ // var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
var subscribes = provider.GetServices(subscribeType).ToList(); // int threadCount = 0;
subscribes.ForEach(subscribe => // int topicCount = 0;
{ // var subscribeTypes = assembly.GetTypes()
// .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
// .ToList();
if (subscribe != null) // if (subscribeTypes.Count == 0) return;
{ // foreach (var subscribeType in subscribeTypes)
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); // {
threadCount += tuple.Item1; // var subscribes = provider.GetServices(subscribeType).ToList();
topicCount += tuple.Item2; // subscribes.ForEach(async subscribe => {
}
}); // if (subscribe != null)
} // {
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); // Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
}); // threadCount += tuple.Item1;
} // topicCount += tuple.Item2;
// }
// });
// }
// logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
// });
//}
/// <summary> /// <summary>
/// 构建Kafka订阅 /// 构建Kafka订阅
/// </summary> /// </summary>
/// <param name="subscribe"></param> /// <param name="subscribe"></param>
/// <param name="provider"></param> /// <param name="provider"></param>
private static Tuple<int,int> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig) private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig, List<Task> tasks)
{ {
var subscribedMethods = subscribe.GetType().GetMethods() var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() }) .Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
@ -155,11 +157,11 @@ namespace JiShe.CollectBus.Kafka
for (int i = 0; i < partitionCount; i++) for (int i = 0; i < partitionCount; i++)
{ {
//if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName) //if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)); tasks.Add( Task.Run(()=> StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)));
threadCount++; threadCount++;
} }
} }
return Tuple.Create(threadCount, subscribedMethods.Length); return await Task.FromResult(Tuple.Create(threadCount, subscribedMethods.Length));
} }
/// <summary> /// <summary>