using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Mvc.Abstractions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections; using System.Collections.Generic; using System.ComponentModel; using System.Linq.Expressions; using System.Reflection; using System.Text.Json; using System.Threading.Tasks; using YamlDotNet.Core.Tokens; using static System.Runtime.InteropServices.JavaScript.JSType; namespace JiShe.CollectBus.Kafka { public static class KafkaSubcribesExtensions { public static void UseInitKafkaTopic(this IServiceProvider provider) { //初始化主题信息 var kafkaAdminClient = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); foreach (var item in topics) { kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult(); } } /// /// 添加Kafka订阅 /// /// /// public static void UseKafkaSubscribe(this IServiceProvider provider) { var lifetime = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); int threadCount = 0; int topicCount = 0; var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); if (string.IsNullOrWhiteSpace(assemblyPath)) { logger.LogInformation($"kafka订阅未能找到程序路径"); return; } var dllFiles = Directory.GetFiles(assemblyPath, "*.dll"); foreach (var file in dllFiles) { // 跳过已加载的程序集 var assemblyName = AssemblyName.GetAssemblyName(file); var existingAssembly = AppDomain.CurrentDomain.GetAssemblies() .FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName); var assembly = existingAssembly ?? Assembly.LoadFrom(file); // 实现IKafkaSubscribe接口 var subscribeTypes = assembly.GetTypes().Where(type => typeof(IKafkaSubscribe).IsAssignableFrom(type) && !type.IsAbstract && !type.IsInterface).ToList(); ; if (subscribeTypes.Count == 0) continue; foreach (var subscribeType in subscribeTypes) { var subscribes = provider.GetServices(subscribeType).ToList(); subscribes.ForEach(subscribe => { if (subscribe != null) { Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } }); } } logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); }); } public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly) { var provider = app.ApplicationServices; var lifetime = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); int threadCount = 0; int topicCount = 0; var subscribeTypes = assembly.GetTypes() .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t)) .ToList(); if (subscribeTypes.Count == 0) return; foreach (var subscribeType in subscribeTypes) { var subscribes = provider.GetServices(subscribeType).ToList(); subscribes.ForEach(subscribe => { if (subscribe != null) { Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } }); } logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); }); } /// /// 构建Kafka订阅 /// /// /// private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider, ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) .Where(x => x.Attribute != null) .ToArray(); //var configuration = provider.GetRequiredService(); int threadCount = 0; foreach (var sub in subscribedMethods) { int partitionCount = 3;// kafkaOptionConfig.NumPartitions; #if DEBUG var adminClientService = provider.GetRequiredService(); int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); partitionCount = partitionCount > topicCount ? topicCount : partitionCount; #endif //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0) partitionCount = 1; for (int i = 0; i < partitionCount; i++) { //if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName) Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)); threadCount++; } } return Tuple.Create(threadCount, subscribedMethods.Length); } /// /// 启动后台消费线程 /// /// /// /// /// /// private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); if (attr.EnableBatch) { await consumerService.SubscribeBatchAsync(attr.Topic, async (message) => { try { #if DEBUG logger.LogInformation($"kafka批量消费消息:{message.Serialize()}"); #endif // 处理消息 return await ProcessMessageAsync(message.ToList(), method, subscribe); } catch (ConsumeException ex) { // 处理消费错误 logger.LogError($"kafka批量消费异常:{ex.Message}"); } return await Task.FromResult(false); }, attr.GroupId, attr.BatchSize, attr.BatchTimeout); } else { await consumerService.SubscribeAsync(attr.Topic, async (message) => { try { #if DEBUG logger.LogInformation($"kafka消费消息:{message}"); #endif // 处理消息 return await ProcessMessageAsync(new List() { message }, method, subscribe); } catch (ConsumeException ex) { // 处理消费错误 logger.LogError($"kafka消费异常:{ex.Message}"); } return await Task.FromResult(false); }, attr.GroupId); } } /// /// 处理消息 /// /// /// /// /// private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); bool isGenericTask = method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); bool existParameters = parameters.Length > 0; object[]? executeParameters = null; if (existParameters) { IList? list = null; Tuple tuple = method.GetParameterTypeInfo(); bool isEnumerable = false; if (tuple.Item2 != null) { Type listType = typeof(List<>).MakeGenericType(tuple.Item2); list = (IList)Activator.CreateInstance(listType)!; isEnumerable = tuple.Item2.IsConvertType(); } else { isEnumerable = tuple.Item1.IsConvertType(); } #region 暂时 //foreach (var msg in messages) //{ // if (tuple.Item2 != null) // { // if (isEnumerable) // { // var parameterType = parameters[0].ParameterType; // var data=messages?.Serialize().Deserialize(parameterType); // messageObj = data!=null? new[] { data }:null; // break; // } // else // { // // 集合类型 // var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/; // if (data != null) // list?.Add(data); // } // } // else // { // // (dynamic)Convert.ChangeType(msg, tuple.Item1) // using (var stream = new MemoryStream(msg)) // { // var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1); // } // var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1); // if (data != null) // messageObj = new[] { data }; // } //} //if (tuple.Item2 != null && list != null && list.Count > 0) //{ // messageObj = new[] { list }; //} #endregion var parameterDescriptors = method.GetParameters(); executeParameters = new object?[parameterDescriptors.Length]; for (var i = 0; i < parameterDescriptors.Length; i++) { foreach (var item in messages) { object? tempParameter=null; var parameterDescriptor = parameterDescriptors[i]; if (KafkaSerialization.IsJsonType(item)) { tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null? tuple.Item2: parameterDescriptor.ParameterType); } else { var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType); if (converter.CanConvertFrom(item.GetType())) { tempParameter = converter.ConvertFrom(item); } else { if (parameterDescriptor.ParameterType.IsInstanceOfType(item)) tempParameter = item; else tempParameter =Convert.ChangeType(item, parameterDescriptor.ParameterType); } } if (tuple.Item2 == null) { executeParameters[i] = tempParameter; } else { list.Add(tempParameter); } } if(list!=null && list.Count>0) executeParameters[i] = list; } } var result = method.Invoke(subscribe, executeParameters); if (result is Task genericTask) { await genericTask.ConfigureAwait(false); return genericTask.Result.Ack; } else if (result is Task nonGenericTask) { await nonGenericTask.ConfigureAwait(false); return true; } else if (result is ISubscribeAck ackResult) { return ackResult.Ack; } return false; } } }