using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using System.Collections.Generic; using System.Reflection; using System.Threading.Tasks; namespace JiShe.CollectBus.Kafka { public static class KafkaSubcribesExtensions { /// /// 添加Kafka订阅 /// /// /// public static void UseKafkaSubscribe(this IServiceProvider provider) { var lifetime = provider.GetRequiredService(); //初始化主题信息 var kafkaAdminClient = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); foreach (var item in topics) { kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); } lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); int threadCount = 0; int topicCount = 0; var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); if (string.IsNullOrWhiteSpace(assemblyPath)) { logger.LogInformation($"kafka订阅未能找到程序路径"); return; } var dllFiles = Directory.GetFiles(assemblyPath, "*.dll"); foreach (var file in dllFiles) { // 跳过已加载的程序集 var assemblyName = AssemblyName.GetAssemblyName(file); var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); var assembly = existingAssembly ?? Assembly.LoadFrom(file); // 实现IKafkaSubscribe接口 var subscribeTypes = assembly.GetTypes().Where(type => typeof(IKafkaSubscribe).IsAssignableFrom(type) && !type.IsAbstract && !type.IsInterface).ToList(); ; if (subscribeTypes.Count == 0) continue; foreach (var subscribeType in subscribeTypes) { var subscribes = provider.GetServices(subscribeType).ToList(); subscribes.ForEach(subscribe => { if (subscribe != null) { Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } }); } } logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); }); } public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly) { var provider = app.ApplicationServices; var lifetime = provider.GetRequiredService(); //初始化主题信息 var kafkaAdminClient = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); foreach (var item in topics) { kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); } lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); int threadCount = 0; int topicCount = 0; var subscribeTypes = assembly.GetTypes() .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) .ToList(); if (subscribeTypes.Count == 0) return; foreach (var subscribeType in subscribeTypes) { var subscribes = provider.GetServices(subscribeType).ToList(); subscribes.ForEach(subscribe => { if (subscribe != null) { Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } }); } logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); }); } /// /// 构建Kafka订阅 /// /// /// private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) .Where(x => x.Attribute != null) .ToArray(); //var configuration = provider.GetRequiredService(); int threadCount = 0; foreach (var sub in subscribedMethods) { int partitionCount = 3;// kafkaOptionConfig.NumPartitions; #if DEBUG var adminClientService = provider.GetRequiredService(); int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); partitionCount= partitionCount> topicCount ? topicCount: partitionCount; #endif //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0) partitionCount = 1; for (int i = 0; i < partitionCount; i++) { //if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName) Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)); threadCount++; } } return Tuple.Create(threadCount, subscribedMethods.Length); } /// /// 启动后台消费线程 /// /// /// /// /// /// private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); if (attr.EnableBatch) { await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { try { #if DEBUG logger.LogInformation($"kafka批量消费消息:{message}"); #endif // 处理消息 return await ProcessMessageAsync(message.ToList(), method, subscribe); } catch (ConsumeException ex) { // 处理消费错误 logger.LogError($"kafka批量消费异常:{ex.Message}"); } return await Task.FromResult(false); }, attr.GroupId, attr.BatchSize, attr.BatchTimeout); } else { await consumerService.SubscribeAsync(attr.Topic, async (message) => { try { #if DEBUG logger.LogInformation($"kafka消费消息:{message}"); #endif // 处理消息 return await ProcessMessageAsync(new List() { message }, method, subscribe); } catch (ConsumeException ex) { // 处理消费错误 logger.LogError($"kafka消费异常:{ex.Message}"); } return await Task.FromResult(false); }, attr.GroupId); } } /// /// 处理消息 /// /// /// /// /// private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); bool isGenericTask = method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); bool existParameters = parameters.Length > 0; List? messageObj = null; if (existParameters) { messageObj = new List(); var paramType = parameters[0].ParameterType; foreach (var msg in messages) { var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg; if (data != null) messageObj.Add(data); } } var result = method.Invoke(subscribe, messageObj?.ToArray()); if (result is Task genericTask) { await genericTask.ConfigureAwait(false); return genericTask.Result.Ack; } else if (result is Task nonGenericTask) { await nonGenericTask.ConfigureAwait(false); return true; } else if (result is ISubscribeAck ackResult) { return ackResult.Ack; } return false; } } }