133 lines
4.7 KiB
C#
Raw Normal View History

2025-04-15 11:15:22 +08:00
using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
2025-04-15 15:49:22 +08:00
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
2025-04-15 11:15:22 +08:00
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
2025-04-15 15:49:22 +08:00
using static Confluent.Kafka.ConfigPropertyNames;
2025-04-15 11:15:22 +08:00
namespace JiShe.CollectBus.Kafka
{
public static class KafkaSubcribesExtensions
{
/// <summary>
/// 添加Kafka订阅
/// </summary>
/// <param name="app"></param>
/// <param name="assembly"></param>
public static void UseKafkaSubscribers(this IApplicationBuilder app, Assembly assembly)
{
var subscribeTypes = assembly.GetTypes()
.Where(t => typeof(IKafkaSubscribe).IsAssignableFrom(t))
.ToList();
if (subscribeTypes.Count == 0) return;
var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
lifetime.ApplicationStarted.Register(() =>
{
foreach (var subscribeType in subscribeTypes)
{
var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => {
if(subscribe is IKafkaSubscribe)
{
BuildKafkaSubscriber(subscribe, provider);
}
});
}
});
}
/// <summary>
/// 构建Kafka订阅
/// </summary>
/// <param name="subscribe"></param>
/// <param name="provider"></param>
private static void BuildKafkaSubscriber(object subscribe, IServiceProvider provider)
{
2025-04-15 15:49:22 +08:00
var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
.Where(x => x.Attribute != null)
.ToArray();
foreach (var sub in subscribedMethods)
2025-04-15 11:15:22 +08:00
{
2025-04-15 15:49:22 +08:00
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe));
2025-04-15 11:15:22 +08:00
}
}
/// <summary>
/// 启动后台消费线程
/// </summary>
/// <param name="config"></param>
/// <param name="attr"></param>
/// <param name="method"></param>
/// <param name="consumerInstance"></param>
/// <returns></returns>
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr,MethodInfo method, object subscribe)
{
2025-04-15 15:49:22 +08:00
var consumerService = provider.GetRequiredService<IConsumerService>();
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
await consumerService.SubscribeAsync<string>(attr.Topics, async (message) =>
2025-04-15 11:15:22 +08:00
{
try
{
// 处理消息
2025-04-15 15:49:22 +08:00
return await ProcessMessageAsync(message, method, subscribe);
2025-04-15 11:15:22 +08:00
}
catch (ConsumeException ex)
{
// 处理消费错误
2025-04-15 15:49:22 +08:00
logger.LogError($"kafka消费异常:{ex.Message}");
2025-04-15 11:15:22 +08:00
}
2025-04-15 15:49:22 +08:00
return await Task.FromResult(false);
2025-04-15 11:15:22 +08:00
});
}
2025-04-15 15:49:22 +08:00
/// <summary>
/// 处理消息
/// </summary>
/// <param name="message"></param>
/// <param name="method"></param>
/// <param name="subscribe"></param>
/// <returns></returns>
private static async Task<bool> ProcessMessageAsync(string message, MethodInfo method, object subscribe)
2025-04-15 11:15:22 +08:00
{
var parameters = method.GetParameters();
2025-04-15 15:49:22 +08:00
if (parameters.Length != 1)
return true;
2025-04-15 11:15:22 +08:00
var paramType = parameters[0].ParameterType;
2025-04-15 15:49:22 +08:00
var messageObj = paramType == typeof(string)? message: JsonConvert.DeserializeObject(message, paramType);
2025-04-15 11:15:22 +08:00
if (method.ReturnType == typeof(Task))
{
2025-04-15 15:49:22 +08:00
object? result = await (Task<bool>)method.Invoke(subscribe, new[] { messageObj })!;
if (result is bool success)
return success;
2025-04-15 11:15:22 +08:00
}
else
{
2025-04-15 15:49:22 +08:00
object? result = method.Invoke(subscribe, new[] { messageObj });
if (result is bool success)
return success;
2025-04-15 11:15:22 +08:00
}
2025-04-15 15:49:22 +08:00
return false;
2025-04-15 11:15:22 +08:00
}
}
}