diff --git a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs index 867e2b7..b467162 100644 --- a/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs +++ b/modules/JiShe.CollectBus.Kafka/CollectBusKafkaModule.cs @@ -40,7 +40,7 @@ namespace JiShe.CollectBus.Kafka // 注册Consumer context.Services.AddSingleton(); - context.Services.AddHostedService(); + //context.Services.AddHostedService(); } public override void OnApplicationInitialization(ApplicationInitializationContext context) @@ -48,7 +48,7 @@ namespace JiShe.CollectBus.Kafka var app = context.GetApplicationBuilder(); // 注册Subscriber - //app.ApplicationServices.UseKafkaSubscribe(); + app.ApplicationServices.UseKafkaSubscribe(); // 获取程序集 //app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); diff --git a/modules/JiShe.CollectBus.Kafka/HostedService.cs b/modules/JiShe.CollectBus.Kafka/HostedService.cs index dcd5197..c2e672c 100644 --- a/modules/JiShe.CollectBus.Kafka/HostedService.cs +++ b/modules/JiShe.CollectBus.Kafka/HostedService.cs @@ -32,8 +32,6 @@ namespace JiShe.CollectBus.Kafka public Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("结束"); - - return Task.CompletedTask; } diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs index 52e5c93..ff60130 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs @@ -25,9 +25,9 @@ namespace JiShe.CollectBus.Kafka /// /// /// - public static async Task UseKafkaSubscribe(this IServiceProvider provider) + public static void UseKafkaSubscribe(this IServiceProvider provider) { - //var lifetime = provider.GetRequiredService(); + var lifetime = provider.GetRequiredService(); //初始化主题信息 var kafkaAdminClient = provider.GetRequiredService(); @@ -40,10 +40,9 @@ namespace JiShe.CollectBus.Kafka { kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); } - List tasks = new List(); - //lifetime.ApplicationStarted.Register(async() => - //{ - var logger = provider.GetRequiredService>(); + lifetime.ApplicationStarted.Register(() => + { + var logger = provider.GetRequiredService>(); int threadCount = 0; int topicCount = 0; var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); @@ -62,19 +61,19 @@ namespace JiShe.CollectBus.Kafka var assembly = existingAssembly ?? Assembly.LoadFrom(file); // 实现IKafkaSubscribe接口 var subscribeTypes = assembly.GetTypes().Where(type => - typeof(IKafkaSubscribe).IsAssignableFrom(type) && + typeof(IKafkaSubscribe).IsAssignableFrom(type) && !type.IsAbstract && !type.IsInterface).ToList(); ; - if (subscribeTypes.Count == 0) + if (subscribeTypes.Count == 0) continue; - + foreach (var subscribeType in subscribeTypes) { var subscribes = provider.GetServices(subscribeType).ToList(); - subscribes.ForEach(async subscribe => + subscribes.ForEach(subscribe => { - if (subscribe!=null) + if (subscribe != null) { - Tuple tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value,tasks); + Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } @@ -82,59 +81,58 @@ namespace JiShe.CollectBus.Kafka } } logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); - //}); - await Task.WhenAll(tasks); + }); } - //public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly) - //{ - // var provider = app.ApplicationServices; - // var lifetime = provider.GetRequiredService(); - // //初始化主题信息 - // var kafkaAdminClient = provider.GetRequiredService(); - // var kafkaOptions = provider.GetRequiredService>(); + public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly) + { + var provider = app.ApplicationServices; + var lifetime = provider.GetRequiredService(); + //初始化主题信息 + var kafkaAdminClient = provider.GetRequiredService(); + var kafkaOptions = provider.GetRequiredService>(); - // List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); - // topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); + List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); - // foreach (var item in topics) - // { - // kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); - // } + foreach (var item in topics) + { + kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); + } + lifetime.ApplicationStarted.Register(() => + { + var logger = provider.GetRequiredService>(); + int threadCount = 0; + int topicCount = 0; + var subscribeTypes = assembly.GetTypes() + .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) + .ToList(); - // lifetime.ApplicationStarted.Register(async () => - // { - // var logger = provider.GetRequiredService>(); - // int threadCount = 0; - // int topicCount = 0; - // var subscribeTypes = assembly.GetTypes() - // .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) - // .ToList(); + if (subscribeTypes.Count == 0) return; + foreach (var subscribeType in subscribeTypes) + { + var subscribes = provider.GetServices(subscribeType).ToList(); + subscribes.ForEach(subscribe => + { - // if (subscribeTypes.Count == 0) return; - // foreach (var subscribeType in subscribeTypes) - // { - // var subscribes = provider.GetServices(subscribeType).ToList(); - // subscribes.ForEach(async subscribe => { - - // if (subscribe != null) - // { - // Tuple tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); - // threadCount += tuple.Item1; - // topicCount += tuple.Item2; - // } - // }); - // } - // logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); - // }); - //} + if (subscribe != null) + { + Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); + threadCount += tuple.Item1; + topicCount += tuple.Item2; + } + }); + } + logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); + }); + } /// /// 构建Kafka订阅 /// /// /// - private static async Task> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig, List tasks) + private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) @@ -157,11 +155,11 @@ namespace JiShe.CollectBus.Kafka for (int i = 0; i < partitionCount; i++) { //if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName) - tasks.Add( Task.Run(()=> StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger))); + Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)); threadCount++; } } - return await Task.FromResult(Tuple.Create(threadCount, subscribedMethods.Length)); + return Tuple.Create(threadCount, subscribedMethods.Length); } ///