using Confluent.Kafka;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Reflection;
namespace JiShe.CollectBus.Kafka
{
public static class KafkaSubcribesExtensions
{
/////
///// 添加Kafka订阅
/////
/////
/////
//public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly[] assemblys)
//{
// var provider = app.ApplicationServices;
// var lifetime = provider.GetRequiredService();
// lifetime.ApplicationStarted.Register(() =>
// {
// var logger = provider.GetRequiredService>();
// int threadCount = 0;
// int topicCount = 0;
// foreach (Assembly assembly in assemblys)
// {
// var subscribeTypes = assembly.GetTypes()
// .Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
// .ToList();
// if (subscribeTypes.Count == 0) return;
// foreach (var subscribeType in subscribeTypes)
// {
// var subscribes = provider.GetServices(subscribeType).ToList();
// subscribes.ForEach(subscribe => {
// if (subscribe is IKafkaSubscribe)
// {
// Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger);
// threadCount += tuple.Item1;
// topicCount += tuple.Item2;
// }
// });
// }
// }
// logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
// });
//}
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
{
var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService();
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService>();
int threadCount = 0;
int topicCount = 0;
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
if (subscribeTypes.Count == 0) return;
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => {
if (subscribe is IKafkaSubscribe)
{
Tuple tuple = BuildKafkaSubscriber(subscribe, provider, logger);
threadCount += tuple.Item1;
topicCount += tuple.Item2;
}
});
}
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
});
}
///
/// 构建Kafka订阅
///
///
///
private static Tuple BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger logger)
{
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute() })
.Where(x => x.Attribute != null)
.ToArray();
int threadCount = 0;
foreach (var sub in subscribedMethods)
{
var adminClientService = provider.GetRequiredService();
int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
partitionCount = 1;
for (int i = 0; i < partitionCount; i++)
{
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
threadCount++;
}
}
return Tuple.Create(threadCount, subscribedMethods.Length);
}
///
/// 启动后台消费线程
///
///
///
///
///
///
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe, ILogger logger)
{
var consumerService = provider.GetRequiredService();
await consumerService.SubscribeAsync(attr.Topic, async (message) =>
{
try
{
// 处理消息
return await ProcessMessageAsync(message, method, subscribe);
}
catch (ConsumeException ex)
{
// 处理消费错误
logger.LogError($"kafka消费异常:{ex.Message}");
}
return await Task.FromResult(false);
});
}
///
/// 处理消息
///
///
///
///
///
private static async Task ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe)
{
var parameters = method.GetParameters();
bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0;
//dynamic? messageObj= null;
//if (existParameters)
//{
// var paramType = parameters[0].ParameterType;
// messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
//}
if (isGenericTask)
{
object? result = await (Task)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
}
else
{
object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
}
}
return false;
}
}
}