JiShe.CollectBus/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
2025-05-23 14:01:33 +08:00

436 lines
20 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections;
using System.ComponentModel;
using System.Reflection;
namespace JiShe.CollectBus.Kafka
{
public static class KafkaSubscribeExtensions
{
private static long _threadCount = 0;
private static long _topicSubscribeCount = 0;
private static long _threadStartCount = 0;
public static void UseInitKafkaTopic(this IServiceProvider provider)
{
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
/// <summary>
/// 添加Kafka订阅
/// </summary>
public static void UseKafkaSubscribe(this IServiceProvider provider)
{
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
//var threadCount = 0;
//var topicCount = 0;
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
if (string.IsNullOrWhiteSpace(assemblyPath))
{
logger.LogWarning($"kafka订阅未能找到程序路径");
return;
}
var dllFiles = Directory.GetFiles(assemblyPath, "*.dll");
foreach (var file in dllFiles)
{
// 跳过已加载的程序集
var assemblyName = AssemblyName.GetAssemblyName(file);
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 实现IKafkaSubscribe接口
var subscribeTypes = assembly.GetTypes().Where(type =>
typeof(IKafkaSubscribe).IsAssignableFrom(type) &&
!type.IsAbstract && !type.IsInterface).ToList();
if (subscribeTypes.Count == 0)
continue;
// 并行处理
Parallel.ForEach(subscribeTypes, subscribeType =>
{
var subscribes = provider.GetServices(subscribeType).ToList();
Parallel.ForEach(subscribes, subscribe =>
{
if (subscribe != null)
{
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
//threadCount += tuple.Item1;
//topicCount += tuple.Item2;
}
});
});
//foreach (var subscribeType in subscribeTypes)
//{
// var subscribes = provider.GetServices(subscribeType).ToList();
// subscribes.ForEach(subscribe =>
// {
// if (subscribe != null)
// {
// Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
// threadCount += tuple.Item1;
// topicCount += tuple.Item2;
// }
// });
//}
}
logger.LogWarning($"kafka订阅主题:{_topicSubscribeCount}数,共启动:{_threadCount}线程");
var kafkaTaskScheduler = provider.GetRequiredService<KafkaTaskScheduler>();
kafkaTaskScheduler.ExceptionEvent += (ex) =>
{
logger.LogError(ex, "Kafka任务调度异常");
};
//logger.LogWarning($"kafka订阅工作线程数{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数");
//
// 订阅调度监控测试可打开
//_ = Task.Factory.StartNew(async () =>
// {
// while (true)
// {
// logger.LogWarning($"kafka订阅工作线程数{kafkaTaskScheduler.WorkerThreads},队列任务:{kafkaTaskScheduler.QueuedTasks}数");
// await Task.Delay(TimeSpan.FromSeconds(5));
// }
// });
});
}
/// <summary>
/// 添加Kafka订阅
/// </summary>
public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly)
{
var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
if (subscribeTypes.Count == 0) return;
Parallel.ForEach(subscribeTypes, subscribeType =>
{
var subscribes = provider.GetServices(subscribeType).ToList();
Parallel.ForEach(subscribes, subscribe =>
{
if (subscribe != null)
{
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
//threadCount += tuple.Item1;
//topicCount += tuple.Item2;
}
});
});
//foreach (var subscribeType in subscribeTypes)
//{
// var subscribes = provider.GetServices(subscribeType).ToList();
// subscribes.ForEach(subscribe =>
// {
// if (subscribe != null)
// {
// Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
// threadCount += tuple.Item1;
// topicCount += tuple.Item2;
// }
// });
//}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
}
/// <summary>
/// 构建Kafka订阅
/// </summary>
private static Tuple<int, int> BuildKafkaSubscribe(object subscribe, IServiceProvider provider, ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
{
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
.Where(x => x.Attribute != null)
.ToArray();
//var configuration = provider.GetRequiredService<IConfiguration>();
int threadCount = 0;
Parallel.ForEach(subscribedMethods, sub =>
{
Interlocked.Increment(ref _topicSubscribeCount);
var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
// 可以根据配置文件TaskThreadCount来配置线程数
int partitionCount = sub.Attribute!.TaskCount == -1 ? (kafkaOptionConfig.TaskThreadCount==-1? topicCount: kafkaOptionConfig.TaskThreadCount) : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
partitionCount = partitionCount > topicCount ? topicCount : partitionCount;
//partitionCount = sub.Attribute!.TaskCount == -1 ? adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
partitionCount = 1;
Parallel.For(0,partitionCount, async (partition) =>
{
Interlocked.Increment(ref _threadCount);
//Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
//threadCount++;
await StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger);
});
});
//foreach (var sub in subscribedMethods)
//{
//// //int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
// var adminClientService = provider.GetRequiredService<IAdminClientService>();
// int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
// int partitionCount = sub.Attribute!.TaskCount == -1 ? topicCount : sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
// partitionCount = partitionCount > topicCount ? topicCount : partitionCount;
// //partitionCount = sub.Attribute!.TaskCount == -1 ? adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
// if (partitionCount <= 0)
// partitionCount = 1;
// for (int i = 0; i < partitionCount; i++)
// {
// //if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
// Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
// threadCount++;
// }
//}
return Tuple.Create(threadCount, subscribedMethods.Length);
}
/// <summary>
/// 启动后台消费线程
/// </summary>
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger)
{
var consumerService = provider.GetRequiredService<IConsumerService>();
if (attr.EnableBatch)
{
Interlocked.Increment(ref _threadStartCount);
logger.LogInformation($"kafka开启线程消费:{_threadStartCount}");
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
{
try
{
#if DEBUG
logger.LogInformation($"kafka批量消费消息:{message.Serialize()}");
#endif
// 处理消息
return await ProcessMessageAsync(message.ToList(), method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
catch (Exception ex)
{
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
return await Task.FromResult(false);
}, attr.GroupId, attr.BatchSize, attr.BatchTimeout);
}
else
{
Interlocked.Increment(ref _threadStartCount);
logger.LogInformation($"kafka开启线程消费:{_threadStartCount}");
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
{
try
{
#if DEBUG
logger.LogInformation($"kafka消费消息:{message}");
#endif
// 处理消息
return await ProcessMessageAsync(new List<object>() { message }, method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}");
}
catch (Exception ex)
{
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
return await Task.FromResult(false);
}, attr.GroupId);
}
}
/// <summary>
/// 处理消息
/// </summary>
private static async Task<bool> ProcessMessageAsync(List<dynamic> messages, MethodInfo method, object subscribe)
{
try
{
var parameters = method.GetParameters();
bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0;
object[]? executeParameters = null;
if (existParameters)
{
IList? list = null;
Tuple<Type, Type?> tuple = method.GetParameterTypeInfo();
bool isEnumerable = false;
if (tuple.Item2 != null)
{
Type listType = typeof(List<>).MakeGenericType(tuple.Item2);
list = (IList)Activator.CreateInstance(listType)!;
isEnumerable = tuple.Item2.IsConvertType();
}
else
{
isEnumerable = tuple.Item1.IsConvertType();
}
#region
//foreach (var msg in messages)
//{
// if (tuple.Item2 != null)
// {
// if (isEnumerable)
// {
// var parameterType = parameters[0].ParameterType;
// var data=messages?.Serialize().Deserialize(parameterType);
// messageObj = data!=null? new[] { data }:null;
// break;
// }
// else
// {
// // 集合类型
// var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/;
// if (data != null)
// list?.Add(data);
// }
// }
// else
// {
// // (dynamic)Convert.ChangeType(msg, tuple.Item1)
// using (var stream = new MemoryStream(msg))
// {
// var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1);
// }
// var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1);
// if (data != null)
// messageObj = new[] { data };
// }
//}
//if (tuple.Item2 != null && list != null && list.Count > 0)
//{
// messageObj = new[] { list };
//}
#endregion
var parameterDescriptors = method.GetParameters();
executeParameters = new object?[parameterDescriptors.Length];
for (var i = 0; i < parameterDescriptors.Length; i++)
{
foreach (var item in messages)
{
object? tempParameter = null;
var parameterDescriptor = parameterDescriptors[i];
if (KafkaSerialization.IsJsonType(item))
{
tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null ? tuple.Item2 : parameterDescriptor.ParameterType);
}
else
{
var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
if (converter.CanConvertFrom(item.GetType()))
{
tempParameter = converter.ConvertFrom(item);
}
else
{
if (parameterDescriptor.ParameterType.IsInstanceOfType(item))
tempParameter = item;
else
tempParameter = Convert.ChangeType(item, parameterDescriptor.ParameterType);
}
}
if (tuple.Item2 == null)
{
executeParameters[i] = tempParameter;
}
else
{
list.Add(tempParameter);
}
}
if (list != null && list.Count > 0)
executeParameters[i] = list;
}
}
var result = method.Invoke(subscribe, executeParameters);
if (result is Task<ISubscribeAck> genericTask)
{
await genericTask.ConfigureAwait(false);
return genericTask.Result.Ack;
}
else if (result is Task nonGenericTask)
{
await nonGenericTask.ConfigureAwait(false);
return true;
}
else if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
return false;
}
catch (Exception ex)
{
throw;
}
}
}
}