Compare commits
6 Commits
19bc05c601
...
444b01095f
| Author | SHA1 | Date | |
|---|---|---|---|
| 444b01095f | |||
| 532acc575f | |||
| f11e514ef4 | |||
| c4d4078bd9 | |||
| b064ff3d14 | |||
| 48b91183c2 |
@ -39,6 +39,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
||||||
EndProject
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}"
|
||||||
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
Debug|Any CPU = Debug|Any CPU
|
Debug|Any CPU = Debug|Any CPU
|
||||||
@ -113,6 +115,10 @@ Global
|
|||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
|
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
|
|||||||
@ -278,7 +278,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
return aa == null;
|
return aa == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
//[KafkaSubscribe("test-topic1")]
|
[KafkaSubscribe(ProtocolConst.TESTTOPIC)]
|
||||||
|
|
||||||
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
|
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -12,11 +12,11 @@ namespace JiShe.CollectBus.Common.Consts
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 心跳下行消息主题
|
/// 心跳下行消息主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event";
|
public const string SubscriberHeartbeatIssuedEventName = "issued.heartbeat.event2";
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 登录下行消息主题
|
/// 登录下行消息主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string SubscriberLoginIssuedEventName = "issued.login.event";
|
public const string SubscriberLoginIssuedEventName = "issued.login.event2";
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 上行消息主题,测试使用
|
/// 上行消息主题,测试使用
|
||||||
@ -26,11 +26,11 @@ namespace JiShe.CollectBus.Common.Consts
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 心跳上行消息主题
|
/// 心跳上行消息主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event";
|
public const string SubscriberHeartbeatReceivedEventName = "received.heartbeat.event2";
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 登录上行消息主题
|
/// 登录上行消息主题
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string SubscriberLoginReceivedEventName = "received.login.event";
|
public const string SubscriberLoginReceivedEventName = "received.login.event2";
|
||||||
|
|
||||||
#region 电表消息主题
|
#region 电表消息主题
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -159,5 +159,11 @@ namespace JiShe.CollectBus.Common.Consts
|
|||||||
/// AFN10H上行主题格式
|
/// AFN10H上行主题格式
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string SubscriberAFN10HReceivedEventNameTemp = "received.afn16h.event";
|
public const string SubscriberAFN10HReceivedEventNameTemp = "received.afn16h.event";
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 测试主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string TESTTOPIC = "test-topic";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -92,7 +92,7 @@
|
|||||||
"SaslPassword": "lixiao1980",
|
"SaslPassword": "lixiao1980",
|
||||||
"KafkaReplicationFactor": 3,
|
"KafkaReplicationFactor": 3,
|
||||||
"NumPartitions": 30,
|
"NumPartitions": 30,
|
||||||
"ServerTagName": "JiSheCollectBus3"
|
"ServerTagName": "JiSheCollectBus2"
|
||||||
//"Topic": {
|
//"Topic": {
|
||||||
// "ReplicationFactor": 3,
|
// "ReplicationFactor": 3,
|
||||||
// "NumPartitions": 1000
|
// "NumPartitions": 1000
|
||||||
|
|||||||
@ -44,10 +44,10 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
||||||
{
|
{
|
||||||
var app = context.GetApplicationBuilder();
|
var app = context.GetApplicationBuilder();
|
||||||
|
|
||||||
// 注册Subscriber
|
// 注册Subscriber
|
||||||
app.ApplicationServices.UseKafkaSubscribers();
|
app.ApplicationServices.UseKafkaSubscribe();
|
||||||
|
|
||||||
// 获取程序集
|
// 获取程序集
|
||||||
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
|
//app.UseKafkaSubscribers(Assembly.Load("JiShe.CollectBus.Application"));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
@ -50,7 +51,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||||||
EnableAutoCommit = false, // 禁止AutoCommit
|
EnableAutoCommit = false, // 禁止AutoCommit
|
||||||
EnablePartitionEof = true, // 启用分区末尾标记
|
EnablePartitionEof = true, // 启用分区末尾标记
|
||||||
//AllowAutoCreateTopics= true, // 启用自动创建
|
//AllowAutoCreateTopics = true, // 启用自动创建
|
||||||
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB)
|
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小(50MB)
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -122,7 +123,9 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
var result = consumer.Consume(cts.Token);
|
var result = consumer.Consume(cts.Token);
|
||||||
if (result == null || result.Message==null || result.Message.Value == null)
|
if (result == null || result.Message==null || result.Message.Value == null)
|
||||||
{
|
{
|
||||||
|
#if DEBUG
|
||||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
||||||
|
#endif
|
||||||
//consumer.Commit(result); // 手动提交
|
//consumer.Commit(result); // 手动提交
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -151,7 +154,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -170,15 +173,19 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
public async Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId) where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
var consumerKey = typeof(KafkaConsumer<string, TValue>);
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
|
||||||
|
//{
|
||||||
|
// string ssss = "";
|
||||||
|
//}
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
||||||
(
|
(
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
CreateConsumer<string, TValue>(groupId),
|
||||||
cts
|
cts
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
)).Consumer as IConsumer<string, TValue>;
|
||||||
|
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
_ = Task.Run(async () =>
|
_ = Task.Run(async () =>
|
||||||
@ -190,8 +197,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
var result = consumer.Consume(cts.Token);
|
var result = consumer.Consume(cts.Token);
|
||||||
if (result == null || result.Message==null || result.Message.Value == null)
|
if (result == null || result.Message==null || result.Message.Value == null)
|
||||||
{
|
{
|
||||||
|
#if DEBUG
|
||||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
||||||
|
#endif
|
||||||
//consumer.Commit(result); // 手动提交
|
//consumer.Commit(result); // 手动提交
|
||||||
|
consumer.StoreOffset(result);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (result.IsPartitionEOF)
|
if (result.IsPartitionEOF)
|
||||||
@ -214,10 +224,12 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
bool sucess = await messageHandler(result.Message.Value);
|
bool sucess = await messageHandler(result.Message.Value);
|
||||||
if (sucess)
|
if (sucess)
|
||||||
consumer.Commit(result); // 手动提交
|
consumer.Commit(result); // 手动提交
|
||||||
|
else
|
||||||
|
consumer.StoreOffset(result);
|
||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
_logger.LogError(ex, $"{string.Join("、", topics)}消息消费失败: {ex.Error.Reason}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -339,7 +351,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, $"消息消费失败: {ex.Error.Reason}");
|
_logger.LogError(ex, $"{string.Join("、", topics)} 消息消费失败: {ex.Error.Reason}");
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
{
|
{
|
||||||
@ -385,14 +397,14 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <param name="consumeTimeout">消费等待时间</param>
|
/// <param name="consumeTimeout">消费等待时间</param>
|
||||||
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
|
public async Task SubscribeBatchAsync<TValue>(string[] topics,Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100,TimeSpan? batchTimeout = null,TimeSpan? consumeTimeout = null)where TValue : class
|
||||||
{
|
{
|
||||||
var consumerKey = typeof(KafkaConsumer<Ignore, TValue>);
|
var consumerKey = typeof(KafkaConsumer<string, TValue>);
|
||||||
var cts = new CancellationTokenSource();
|
var cts = new CancellationTokenSource();
|
||||||
|
|
||||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
var consumer = _consumerStore.GetOrAdd(consumerKey, _ =>
|
||||||
(
|
(
|
||||||
CreateConsumer<Ignore, TValue>(groupId),
|
CreateConsumer<string, TValue>(groupId),
|
||||||
cts
|
cts
|
||||||
)).Consumer as IConsumer<Ignore, TValue>;
|
)).Consumer as IConsumer<string, TValue>;
|
||||||
|
|
||||||
consumer!.Subscribe(topics);
|
consumer!.Subscribe(topics);
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Kafka.AdminClient;
|
using JiShe.CollectBus.Kafka.AdminClient;
|
||||||
using JiShe.CollectBus.Kafka.Attributes;
|
using JiShe.CollectBus.Kafka.Attributes;
|
||||||
using JiShe.CollectBus.Kafka.Consumer;
|
using JiShe.CollectBus.Kafka.Consumer;
|
||||||
@ -10,7 +11,9 @@ using Microsoft.Extensions.DependencyInjection;
|
|||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
|
using Newtonsoft.Json;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka
|
namespace JiShe.CollectBus.Kafka
|
||||||
{
|
{
|
||||||
@ -21,7 +24,7 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="app"></param>
|
/// <param name="app"></param>
|
||||||
/// <param name="assembly"></param>
|
/// <param name="assembly"></param>
|
||||||
public static void UseKafkaSubscribers(this IServiceProvider provider)
|
public static void UseKafkaSubscribe(this IServiceProvider provider)
|
||||||
{
|
{
|
||||||
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
|
||||||
|
|
||||||
@ -65,11 +68,11 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
foreach (var subscribeType in subscribeTypes)
|
foreach (var subscribeType in subscribeTypes)
|
||||||
{
|
{
|
||||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
var subscribes = provider.GetServices(subscribeType).ToList();
|
||||||
subscribes.ForEach(subscribe =>
|
subscribes.ForEach(async subscribe =>
|
||||||
{
|
{
|
||||||
if (subscribe!=null)
|
if (subscribe!=null)
|
||||||
{
|
{
|
||||||
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
|
Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
|
||||||
threadCount += tuple.Item1;
|
threadCount += tuple.Item1;
|
||||||
topicCount += tuple.Item2;
|
topicCount += tuple.Item2;
|
||||||
}
|
}
|
||||||
@ -77,6 +80,7 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
logger.LogInformation($"kafka订阅主题:{topicCount}数,共启动:{threadCount}线程");
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,11 +113,11 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
foreach (var subscribeType in subscribeTypes)
|
foreach (var subscribeType in subscribeTypes)
|
||||||
{
|
{
|
||||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
var subscribes = provider.GetServices(subscribeType).ToList();
|
||||||
subscribes.ForEach(subscribe => {
|
subscribes.ForEach(async subscribe => {
|
||||||
|
|
||||||
if (subscribe != null)
|
if (subscribe != null)
|
||||||
{
|
{
|
||||||
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
|
Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
|
||||||
threadCount += tuple.Item1;
|
threadCount += tuple.Item1;
|
||||||
topicCount += tuple.Item2;
|
topicCount += tuple.Item2;
|
||||||
}
|
}
|
||||||
@ -128,7 +132,7 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="subscribe"></param>
|
/// <param name="subscribe"></param>
|
||||||
/// <param name="provider"></param>
|
/// <param name="provider"></param>
|
||||||
private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
|
private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
|
||||||
{
|
{
|
||||||
var subscribedMethods = subscribe.GetType().GetMethods()
|
var subscribedMethods = subscribe.GetType().GetMethods()
|
||||||
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
|
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
|
||||||
@ -139,13 +143,16 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
foreach (var sub in subscribedMethods)
|
foreach (var sub in subscribedMethods)
|
||||||
{
|
{
|
||||||
int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
|
int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
|
||||||
//var adminClientService = provider.GetRequiredService<IAdminClientService>();
|
var adminClientService = provider.GetRequiredService<IAdminClientService>();
|
||||||
|
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
|
||||||
|
partitionCount= partitionCount> topicCount ? topicCount: partitionCount;
|
||||||
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
|
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
|
||||||
if (partitionCount <= 0)
|
if (partitionCount <= 0)
|
||||||
partitionCount = 1;
|
partitionCount = 1;
|
||||||
for (int i = 0; i < partitionCount; i++)
|
for (int i = 0; i < partitionCount; i++)
|
||||||
{
|
{
|
||||||
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
|
//if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
|
||||||
|
await StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger);
|
||||||
threadCount++;
|
threadCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -166,12 +173,12 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
|
|
||||||
if (attr.EnableBatch)
|
if (attr.EnableBatch)
|
||||||
{
|
{
|
||||||
await consumerService.SubscribeBatchAsync<dynamic>(attr.Topic, async (message) =>
|
await consumerService.SubscribeBatchAsync<object>(attr.Topic, async (message) =>
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// 处理消息
|
// 处理消息
|
||||||
return await ProcessMessageAsync(message, method, subscribe);
|
return await ProcessMessageAsync(message.ToList(), method, subscribe);
|
||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
@ -183,12 +190,12 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
await consumerService.SubscribeAsync<dynamic>(attr.Topic, async (message) =>
|
await consumerService.SubscribeAsync<object>(attr.Topic, async (message) =>
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// 处理消息
|
// 处理消息
|
||||||
return await ProcessMessageAsync(message, method, subscribe);
|
return await ProcessMessageAsync(new List<object>() { message }, method, subscribe);
|
||||||
}
|
}
|
||||||
catch (ConsumeException ex)
|
catch (ConsumeException ex)
|
||||||
{
|
{
|
||||||
@ -209,36 +216,42 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
/// <param name="method"></param>
|
/// <param name="method"></param>
|
||||||
/// <param name="subscribe"></param>
|
/// <param name="subscribe"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private static async Task<bool> ProcessMessageAsync(dynamic message, MethodInfo method, object subscribe)
|
private static async Task<bool> ProcessMessageAsync(List<object> messages, MethodInfo method, object subscribe)
|
||||||
{
|
{
|
||||||
var parameters = method.GetParameters();
|
var parameters = method.GetParameters();
|
||||||
bool isGenericTask = method.ReturnType.IsGenericType
|
bool isGenericTask = method.ReturnType.IsGenericType
|
||||||
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
|
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
|
||||||
bool existParameters = parameters.Length > 0;
|
bool existParameters = parameters.Length > 0;
|
||||||
//dynamic? messageObj= null;
|
List<object>? messageObj = null;
|
||||||
//if (existParameters)
|
if (existParameters)
|
||||||
//{
|
|
||||||
// var paramType = parameters[0].ParameterType;
|
|
||||||
// messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType);
|
|
||||||
//}
|
|
||||||
if (isGenericTask)
|
|
||||||
{
|
{
|
||||||
object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
|
messageObj = new List<object>();
|
||||||
if (result is ISubscribeAck ackResult)
|
var paramType = parameters[0].ParameterType;
|
||||||
|
foreach (var msg in messages)
|
||||||
{
|
{
|
||||||
return ackResult.Ack;
|
var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg;
|
||||||
|
if (data != null)
|
||||||
|
messageObj.Add(data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
|
||||||
|
var result = method.Invoke(subscribe, messageObj?.ToArray());
|
||||||
|
if (result is Task<ISubscribeAck> genericTask)
|
||||||
{
|
{
|
||||||
object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
|
await genericTask.ConfigureAwait(false);
|
||||||
if (result is ISubscribeAck ackResult)
|
return genericTask.Result.Ack;
|
||||||
{
|
}
|
||||||
return ackResult.Ack;
|
else if (result is Task nonGenericTask)
|
||||||
}
|
{
|
||||||
|
await nonGenericTask.ConfigureAwait(false);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else if (result is ISubscribeAck ackResult)
|
||||||
|
{
|
||||||
|
return ackResult.Ack;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -129,6 +129,7 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
var producer = GetProducer<string, TValue>(typeKey);
|
var producer = GetProducer<string, TValue>(typeKey);
|
||||||
var message = new Message<string, TValue>
|
var message = new Message<string, TValue>
|
||||||
{
|
{
|
||||||
|
Key= _kafkaOptionConfig.ServerTagName,
|
||||||
Value = value,
|
Value = value,
|
||||||
Headers = new Headers{
|
Headers = new Headers{
|
||||||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||||||
@ -187,6 +188,7 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
{
|
{
|
||||||
var message = new Message<string, TValue>
|
var message = new Message<string, TValue>
|
||||||
{
|
{
|
||||||
|
Key = _kafkaOptionConfig.ServerTagName,
|
||||||
Value = value,
|
Value = value,
|
||||||
Headers = new Headers{
|
Headers = new Headers{
|
||||||
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user