JiShe.CollectBus/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs

361 lines
15 KiB
C#
Raw Permalink Normal View History

2025-04-15 11:15:22 +08:00
using Confluent.Kafka;
2025-04-17 13:35:08 +08:00
using JiShe.CollectBus.Common.Consts;
2025-04-17 14:39:14 +08:00
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.AdminClient;
2025-04-15 11:15:22 +08:00
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
2025-04-15 11:15:22 +08:00
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Mvc.Abstractions;
2025-04-17 13:35:08 +08:00
using Microsoft.Extensions.Configuration;
2025-04-15 11:15:22 +08:00
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
2025-04-15 15:49:22 +08:00
using Microsoft.Extensions.Logging;
2025-04-17 14:39:14 +08:00
using Microsoft.Extensions.Options;
using System;
using System.Collections;
2025-04-17 22:21:34 +08:00
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq.Expressions;
2025-04-15 11:15:22 +08:00
using System.Reflection;
using System.Text.Json;
using System.Threading.Tasks;
using YamlDotNet.Core.Tokens;
using static System.Runtime.InteropServices.JavaScript.JSType;
2025-04-15 11:15:22 +08:00
namespace JiShe.CollectBus.Kafka
{
public static class KafkaSubcribesExtensions
{
2025-04-17 11:53:29 +08:00
public static void UseInitKafkaTopic(this IServiceProvider provider)
{
2025-04-17 14:39:14 +08:00
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
/// <summary>
/// 添加Kafka订阅
/// </summary>
/// <param name="app"></param>
/// <param name="assembly"></param>
public static void UseKafkaSubscribe(this IServiceProvider provider)
{
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
2025-04-18 09:28:48 +08:00
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
if (string.IsNullOrWhiteSpace(assemblyPath))
{
logger.LogInformation($"kafka订阅未能找到程序路径");
return;
}
var dllFiles = Directory.GetFiles(assemblyPath, "*.dll");
foreach (var file in dllFiles)
{
// 跳过已加载的程序集
var assemblyName = AssemblyName.GetAssemblyName(file);
var existingAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(a => a.GetName().FullName == assemblyName.FullName);
var assembly = existingAssembly ?? Assembly.LoadFrom(file);
// 实现IKafkaSubscribe接口
var subscribeTypes = assembly.GetTypes().Where(type =>
2025-04-18 09:28:48 +08:00
typeof(IKafkaSubscribe).IsAssignableFrom(type) &&
!type.IsAbstract && !type.IsInterface).ToList(); ;
2025-04-18 09:28:48 +08:00
if (subscribeTypes.Count == 0)
continue;
2025-04-18 09:28:48 +08:00
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
2025-04-18 09:28:48 +08:00
subscribes.ForEach(subscribe =>
{
2025-04-18 09:28:48 +08:00
if (subscribe != null)
{
2025-04-18 09:28:48 +08:00
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
});
}
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
2025-04-18 09:28:48 +08:00
});
}
2025-04-17 11:53:29 +08:00
2025-04-18 09:28:48 +08:00
public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly)
{
var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
int threadCount = 0;
int topicCount = 0;
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
2025-04-18 09:28:48 +08:00
if (subscribeTypes.Count == 0) return;
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe =>
{
2025-04-18 09:28:48 +08:00
if (subscribe != null)
{
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
});
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
}
2025-04-15 11:15:22 +08:00
/// <summary>
/// 构建Kafka订阅
/// </summary>
/// <param name="subscribe"></param>
/// <param name="provider"></param>
private static Tuple<int, int> BuildKafkaSubscribe(object subscribe, IServiceProvider provider, ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
2025-04-15 11:15:22 +08:00
{
2025-04-15 15:49:22 +08:00
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
.Where(x => x.Attribute != null)
.ToArray();
2025-04-17 14:39:14 +08:00
//var configuration = provider.GetRequiredService<IConfiguration>();
int threadCount = 0;
2025-04-15 15:49:22 +08:00
foreach (var sub in subscribedMethods)
2025-04-15 11:15:22 +08:00
{
int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
#if DEBUG
var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
partitionCount = partitionCount > topicCount ? topicCount : partitionCount;
#endif
2025-04-17 13:35:08 +08:00
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
partitionCount = 1;
for (int i = 0; i < partitionCount; i++)
{
//if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
2025-04-18 09:28:48 +08:00
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
threadCount++;
}
2025-04-15 11:15:22 +08:00
}
2025-04-18 09:28:48 +08:00
return Tuple.Create(threadCount, subscribedMethods.Length);
2025-04-15 11:15:22 +08:00
}
/// <summary>
/// 启动后台消费线程
/// </summary>
/// <param name="config"></param>
/// <param name="attr"></param>
/// <param name="method"></param>
/// <param name="consumerInstance"></param>
/// <returns></returns>
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger<CollectBusKafkaModule> logger)
2025-04-15 11:15:22 +08:00
{
var consumerService = provider.GetRequiredService<IConsumerService>();
2025-04-17 22:21:34 +08:00
if (attr.EnableBatch)
{
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
2025-04-15 11:15:22 +08:00
{
try
2025-04-17 13:56:17 +08:00
{
2025-04-18 08:23:31 +08:00
#if DEBUG
logger.LogInformation($"kafka批量消费消息:{message.Serialize()}");
2025-04-18 08:23:31 +08:00
#endif
// 处理消息
return await ProcessMessageAsync(message.ToList(), method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka批量消费异常:{ex.Message}");
}
return await Task.FromResult(false);
}, attr.GroupId, attr.BatchSize, attr.BatchTimeout);
}
else
{
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
2025-04-15 11:15:22 +08:00
{
try
2025-04-17 13:56:17 +08:00
{
#if DEBUG
logger.LogInformation($"kafka消费消息:{message}");
#endif
// 处理消息
return await ProcessMessageAsync(new List<object>() { message }, method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}");
}
return await Task.FromResult(false);
}, attr.GroupId);
}
2025-04-15 11:15:22 +08:00
}
2025-04-15 18:03:51 +08:00
2025-04-15 15:49:22 +08:00
/// <summary>
/// 处理消息
/// </summary>
/// <param name="message"></param>
/// <param name="method"></param>
/// <param name="subscribe"></param>
/// <returns></returns>
private static async Task<bool> ProcessMessageAsync(List<dynamic> messages, MethodInfo method, object subscribe)
2025-04-15 11:15:22 +08:00
{
var parameters = method.GetParameters();
2025-04-15 18:03:51 +08:00
bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0;
object[]? executeParameters = null;
2025-04-17 18:08:27 +08:00
if (existParameters)
2025-04-15 18:03:51 +08:00
{
IList? list = null;
Tuple<Type, Type?> tuple = method.GetParameterTypeInfo();
bool isEnumerable = false;
if (tuple.Item2 != null)
{
Type listType = typeof(List<>).MakeGenericType(tuple.Item2);
list = (IList)Activator.CreateInstance(listType)!;
isEnumerable = tuple.Item2.IsConvertType();
}
else
{
isEnumerable = tuple.Item1.IsConvertType();
}
#region
//foreach (var msg in messages)
//{
// if (tuple.Item2 != null)
// {
// if (isEnumerable)
// {
// var parameterType = parameters[0].ParameterType;
// var data=messages?.Serialize().Deserialize(parameterType);
// messageObj = data!=null? new[] { data }:null;
// break;
// }
// else
// {
// // 集合类型
// var data = msg?.Serialize().Deserialize(tuple.Item2) /*isEnumerable ? Convert.ChangeType(msg, tuple.Item2) : msg?.Serialize().Deserialize(tuple.Item2)*/;
// if (data != null)
// list?.Add(data);
// }
// }
// else
// {
// // (dynamic)Convert.ChangeType(msg, tuple.Item1)
// using (var stream = new MemoryStream(msg))
// {
// var data1= System.Text.Json.JsonSerializer.Deserialize(stream, tuple.Item1);
// }
// var data = isEnumerable ? System.Text.Json.JsonSerializer.Deserialize(msg, tuple.Item1): msg?.ToString()?.Deserialize(tuple.Item1);
// if (data != null)
// messageObj = new[] { data };
// }
//}
//if (tuple.Item2 != null && list != null && list.Count > 0)
//{
// messageObj = new[] { list };
//}
#endregion
var parameterDescriptors = method.GetParameters();
executeParameters = new object?[parameterDescriptors.Length];
for (var i = 0; i < parameterDescriptors.Length; i++)
{
foreach (var item in messages)
{
object? tempParameter=null;
var parameterDescriptor = parameterDescriptors[i];
if (KafkaSerialization.IsJsonType(item))
{
tempParameter = KafkaSerialization.Deserialize(item, tuple.Item2 != null? tuple.Item2: parameterDescriptor.ParameterType);
}
else
{
var converter = TypeDescriptor.GetConverter(parameterDescriptor.ParameterType);
if (converter.CanConvertFrom(item.GetType()))
{
tempParameter = converter.ConvertFrom(item);
}
else
{
if (parameterDescriptor.ParameterType.IsInstanceOfType(item))
tempParameter = item;
else
tempParameter =Convert.ChangeType(item, parameterDescriptor.ParameterType);
}
}
if (tuple.Item2 == null)
{
executeParameters[i] = tempParameter;
}
else
{
list.Add(tempParameter);
}
}
if(list!=null && list.Count>0)
executeParameters[i] = list;
}
2025-04-15 11:15:22 +08:00
}
var result = method.Invoke(subscribe, executeParameters);
2025-04-17 18:08:27 +08:00
if (result is Task<ISubscribeAck> genericTask)
2025-04-15 11:15:22 +08:00
{
2025-04-17 18:08:27 +08:00
await genericTask.ConfigureAwait(false);
return genericTask.Result.Ack;
}
else if (result is Task nonGenericTask)
{
await nonGenericTask.ConfigureAwait(false);
return true;
2025-04-15 11:15:22 +08:00
}
2025-04-17 18:08:27 +08:00
else if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
2025-04-15 15:49:22 +08:00
return false;
2025-04-15 11:15:22 +08:00
}
2025-04-15 11:15:22 +08:00
}
2025-04-15 11:15:22 +08:00
}