Compare commits

..

6 Commits

8 changed files with 89 additions and 50 deletions

View File

@ -39,6 +39,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -113,6 +115,10 @@ Global
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE

View File

@ -278,7 +278,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
return aa == null; return aa == null;
} }
//[KafkaSubscribe("test-topic1")] [KafkaSubscribe(ProtocolConst.TESTTOPIC)]
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj) public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
{ {

View File

@ -12,11 +12,11 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary> /// <summary>
/// 心跳下行消息主题 /// 心跳下行消息主题
/// </summary> /// </summary>
public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event"; public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event2";
/// <summary> /// <summary>
/// 登录下行消息主题 /// 登录下行消息主题
/// </summary> /// </summary>
public const string SubscriberLoginIssuedEventName = "issued.login.event"; public const string SubscriberLoginIssuedEventName = "issued.login.event2";
/// <summary> /// <summary>
/// 上行消息主题,测试使用 /// 上行消息主题,测试使用
@ -26,11 +26,11 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary> /// <summary>
/// 心跳上行消息主题 /// 心跳上行消息主题
/// </summary> /// </summary>
public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event"; public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event2";
/// <summary> /// <summary>
/// 登录上行消息主题 /// 登录上行消息主题
/// </summary> /// </summary>
public const string SubscriberLoginReceivedEventName = "received.login.event"; public const string SubscriberLoginReceivedEventName = "received.login.event2";
#region #region
/// <summary> /// <summary>
@ -159,5 +159,11 @@ namespace JiShe.CollectBus.Common.Consts
/// AFN10H上行主题格式 /// AFN10H上行主题格式
/// </summary> /// </summary>
public const string SubscriberAFN10HReceivedEventNameTemp = "received.afn16h.event"; public const string SubscriberAFN10HReceivedEventNameTemp = "received.afn16h.event";
/// <summary>
/// 测试主题格式
/// </summary>
public const string TESTTOPIC = "test-topic";
} }
} }

View File

@ -92,7 +92,7 @@
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"ServerTagName": "JiSheCollectBus3" "ServerTagName": "JiSheCollectBus2"
//"Topic": { //"Topic": {
// "ReplicationFactor": 3, // "ReplicationFactor": 3,
// "NumPartitions": 1000 // "NumPartitions": 1000

View File

@ -44,10 +44,10 @@ namespace JiShe.CollectBus.Kafka
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override void OnApplicationInitialization(ApplicationInitializationContext context)
{ {
var app = context.GetApplicationBuilder(); var app = context.GetApplicationBuilder();
// 注册Subscriber // 注册Subscriber
app.ApplicationServices.UseKafkaSubscribers(); app.ApplicationServices.UseKafkaSubscribe();
// 获取程序集 // 获取程序集
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application")); //app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
} }

View File

@ -1,4 +1,5 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -50,7 +51,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
AutoOffsetReset = AutoOffsetReset.Earliest, AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记 EnablePartitionEof = true, // 启用分区末尾标记
//AllowAutoCreateTopics= true, // 启用自动创建 //AllowAutoCreateTopics = true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
}; };
@ -122,7 +123,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
var result = consumer.Consume(cts.Token); var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null) if (result == null || result.Message==null || result.Message.Value == null)
{ {
#if DEBUG
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
#endif
//consumer.Commit(result); // 手动提交 //consumer.Commit(result); // 手动提交
continue; continue;
} }
@ -151,7 +154,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); _logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
} }
} }
}); });
@ -170,15 +173,19 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <returns></returns> /// <returns></returns>
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
{ {
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>); var consumerKey = typeof(KafkaConsumer<string, TValue>);
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
//{
// string ssss = "";
//}
var consumer = _consumerStore.GetOrAdd(consumerKey, _=> var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
( (
CreateConsumer<Ignore, TValue>(groupId), CreateConsumer<string, TValue>(groupId),
cts cts
)).Consumer as IConsumer<Ignore, TValue>; )).Consumer as IConsumer<string, TValue>;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);
_ = Task.Run(async () => _ = Task.Run(async () =>
@ -190,8 +197,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
var result = consumer.Consume(cts.Token); var result = consumer.Consume(cts.Token);
if (result == null || result.Message==null || result.Message.Value == null) if (result == null || result.Message==null || result.Message.Value == null)
{ {
#if DEBUG
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
#endif
//consumer.Commit(result); // 手动提交 //consumer.Commit(result); // 手动提交
consumer.StoreOffset(result);
continue; continue;
} }
if (result.IsPartitionEOF) if (result.IsPartitionEOF)
@ -214,10 +224,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
bool sucess = await messageHandler(result.Message.Value); bool sucess = await messageHandler(result.Message.Value);
if (sucess) if (sucess)
consumer.Commit(result); // 手动提交 consumer.Commit(result); // 手动提交
else
consumer.StoreOffset(result);
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); _logger.LogError(ex, $"{string.Join("", topics)}消息消费失败: {ex.Error.Reason}");
} }
} }
}); });
@ -339,7 +351,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}"); _logger.LogError(ex, $"{string.Join("", topics)} 消息消费失败: {ex.Error.Reason}");
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@ -385,14 +397,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// <param name="consumeTimeout">消费等待时间</param> /// <param name="consumeTimeout">消费等待时间</param>
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
{ {
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>); var consumerKey = typeof(KafkaConsumer<string, TValue>);
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var consumer = _consumerStore.GetOrAdd(consumerKey, _ => var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
( (
CreateConsumer<Ignore, TValue>(groupId), CreateConsumer<string, TValue>(groupId),
cts cts
)).Consumer as IConsumer<Ignore, TValue>; )).Consumer as IConsumer<string, TValue>;
consumer!.Subscribe(topics); consumer!.Subscribe(topics);

View File

@ -1,6 +1,7 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
@ -10,7 +11,9 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka
{ {
@ -21,7 +24,7 @@ namespace JiShe.CollectBus.Kafka
/// </summary> /// </summary>
/// <param name="app"></param> /// <param name="app"></param>
/// <param name="assembly"></param> /// <param name="assembly"></param>
public static void UseKafkaSubscribers(this IServiceProvider provider) public static void UseKafkaSubscribe(this IServiceProvider provider)
{ {
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
@ -65,11 +68,11 @@ namespace JiShe.CollectBus.Kafka
foreach (var subscribeType in subscribeTypes) foreach (var subscribeType in subscribeTypes)
{ {
var subscribes = provider.GetServices(subscribeType).ToList(); var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => subscribes.ForEach(async subscribe =>
{ {
if (subscribe!=null) if (subscribe!=null)
{ {
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value); Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1; threadCount += tuple.Item1;
topicCount += tuple.Item2; topicCount += tuple.Item2;
} }
@ -77,6 +80,7 @@ namespace JiShe.CollectBus.Kafka
} }
} }
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程"); logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
}); });
} }
@ -109,11 +113,11 @@ namespace JiShe.CollectBus.Kafka
foreach (var subscribeType in subscribeTypes) foreach (var subscribeType in subscribeTypes)
{ {
var subscribes = provider.GetServices(subscribeType).ToList(); var subscribes = provider.GetServices(subscribeType).ToList();
subscribes.ForEach(subscribe => { subscribes.ForEach(async subscribe => {
if (subscribe != null) if (subscribe != null)
{ {
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value); Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1; threadCount += tuple.Item1;
topicCount += tuple.Item2; topicCount += tuple.Item2;
} }
@ -128,7 +132,7 @@ namespace JiShe.CollectBus.Kafka
/// </summary> /// </summary>
/// <param name="subscribe"></param> /// <param name="subscribe"></param>
/// <param name="provider"></param> /// <param name="provider"></param>
private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig) private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
{ {
var subscribedMethods = subscribe.GetType().GetMethods() var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() }) .Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
@ -139,13 +143,16 @@ namespace JiShe.CollectBus.Kafka
foreach (var sub in subscribedMethods) foreach (var sub in subscribedMethods)
{ {
int partitionCount = 3;// kafkaOptionConfig.NumPartitions; int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
//var adminClientService = provider.GetRequiredService<IAdminClientService>(); var adminClientService = provider.GetRequiredService<IAdminClientService>();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
partitionCount= partitionCount> topicCount ? topicCount: partitionCount;
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0) if (partitionCount <= 0)
partitionCount = 1; partitionCount = 1;
for (int i = 0; i < partitionCount; i++) for (int i = 0; i < partitionCount; i++)
{ {
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)); //if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
await StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger);
threadCount++; threadCount++;
} }
} }
@ -166,12 +173,12 @@ namespace JiShe.CollectBus.Kafka
if (attr.EnableBatch) if (attr.EnableBatch)
{ {
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) => await consumerService.SubscribeBatchAsync<object>(attr.Topic, async (message) =>
{ {
try try
{ {
// 处理消息 // 处理消息
return await ProcessMessageAsync(message, method, subscribe); return await ProcessMessageAsync(message.ToList(), method, subscribe);
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
@ -183,12 +190,12 @@ namespace JiShe.CollectBus.Kafka
} }
else else
{ {
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) => await consumerService.SubscribeAsync<object>(attr.Topic, async (message) =>
{ {
try try
{ {
// 处理消息 // 处理消息
return await ProcessMessageAsync(message, method, subscribe); return await ProcessMessageAsync(new List<object>() { message }, method, subscribe);
} }
catch (ConsumeException ex) catch (ConsumeException ex)
{ {
@ -209,36 +216,42 @@ namespace JiShe.CollectBus.Kafka
/// <param name="method"></param> /// <param name="method"></param>
/// <param name="subscribe"></param> /// <param name="subscribe"></param>
/// <returns></returns> /// <returns></returns>
private static async Task<bool> ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe) private static async Task<bool> ProcessMessageAsync(List<object> messages, MethodInfo method, object subscribe)
{ {
var parameters = method.GetParameters(); var parameters = method.GetParameters();
bool isGenericTask = method.ReturnType.IsGenericType bool isGenericTask = method.ReturnType.IsGenericType
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
bool existParameters = parameters.Length > 0; bool existParameters = parameters.Length > 0;
//dynamic? messageObj= null; List<object>? messageObj = null;
//if (existParameters) if (existParameters)
//{
// var paramType = parameters[0].ParameterType;
// messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
//}
if (isGenericTask)
{ {
object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!; messageObj = new List<object>();
if (result is ISubscribeAck ackResult) var paramType = parameters[0].ParameterType;
foreach (var msg in messages)
{ {
return ackResult.Ack; var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg;
if (data != null)
messageObj.Add(data);
} }
} }
else
var result = method.Invoke(subscribe, messageObj?.ToArray());
if (result is Task<ISubscribeAck> genericTask)
{ {
object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null); await genericTask.ConfigureAwait(false);
if (result is ISubscribeAck ackResult) return genericTask.Result.Ack;
{ }
return ackResult.Ack; else if (result is Task nonGenericTask)
} {
await nonGenericTask.ConfigureAwait(false);
return true;
}
else if (result is ISubscribeAck ackResult)
{
return ackResult.Ack;
} }
return false; return false;
} }
} }
} }

View File

@ -129,6 +129,7 @@ namespace JiShe.CollectBus.Kafka.Producer
var producer = GetProducer<string, TValue>(typeKey); var producer = GetProducer<string, TValue>(typeKey);
var message = new Message<string, TValue> var message = new Message<string, TValue>
{ {
Key= _kafkaOptionConfig.ServerTagName,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
@ -187,6 +188,7 @@ namespace JiShe.CollectBus.Kafka.Producer
{ {
var message = new Message<string, TValue> var message = new Message<string, TValue>
{ {
Key = _kafkaOptionConfig.ServerTagName,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }