优化kafka 订阅增加判断时间分区数是否大于配置数
This commit is contained in:
parent
f11e514ef4
commit
532acc575f
@ -39,6 +39,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
@ -113,6 +115,10 @@ Global
|
||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
|
||||
@ -91,7 +91,7 @@
|
||||
"SaslUserName": "lixiao",
|
||||
"SaslPassword": "lixiao1980",
|
||||
"KafkaReplicationFactor": 3,
|
||||
"NumPartitions": 1,
|
||||
"NumPartitions": 30,
|
||||
"ServerTagName": "JiSheCollectBus2"
|
||||
//"Topic": {
|
||||
// "ReplicationFactor": 3,
|
||||
|
||||
@ -175,16 +175,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
||||
{
|
||||
var consumerKey = typeof(KafkaConsumer<string, TValue>);
|
||||
var cts = new CancellationTokenSource();
|
||||
if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
|
||||
{
|
||||
string ssss = "";
|
||||
}
|
||||
//if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName))
|
||||
//{
|
||||
// string ssss = "";
|
||||
//}
|
||||
var consumer = _consumerStore.GetOrAdd(consumerKey, _=>
|
||||
(
|
||||
CreateConsumer<string, TValue>(groupId),
|
||||
cts
|
||||
)).Consumer as IConsumer<string, TValue>;
|
||||
|
||||
|
||||
consumer!.Subscribe(topics);
|
||||
|
||||
_ = Task.Run(async () =>
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
using Confluent.Kafka;
|
||||
using JiShe.CollectBus.Common.Consts;
|
||||
using JiShe.CollectBus.Common.Extensions;
|
||||
using JiShe.CollectBus.Common.Helpers;
|
||||
using JiShe.CollectBus.Kafka.AdminClient;
|
||||
using JiShe.CollectBus.Kafka.Attributes;
|
||||
using JiShe.CollectBus.Kafka.Consumer;
|
||||
@ -12,6 +13,7 @@ using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Newtonsoft.Json;
|
||||
using System.Reflection;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Kafka
|
||||
{
|
||||
@ -66,11 +68,11 @@ namespace JiShe.CollectBus.Kafka
|
||||
foreach (var subscribeType in subscribeTypes)
|
||||
{
|
||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
||||
subscribes.ForEach(subscribe =>
|
||||
subscribes.ForEach(async subscribe =>
|
||||
{
|
||||
if (subscribe!=null)
|
||||
{
|
||||
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
|
||||
Tuple<int, int> tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
|
||||
threadCount += tuple.Item1;
|
||||
topicCount += tuple.Item2;
|
||||
}
|
||||
@ -111,11 +113,11 @@ namespace JiShe.CollectBus.Kafka
|
||||
foreach (var subscribeType in subscribeTypes)
|
||||
{
|
||||
var subscribes = provider.GetServices(subscribeType).ToList();
|
||||
subscribes.ForEach(subscribe => {
|
||||
subscribes.ForEach(async subscribe => {
|
||||
|
||||
if (subscribe != null)
|
||||
{
|
||||
Tuple<int, int> tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
|
||||
Tuple<int, int> tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value);
|
||||
threadCount += tuple.Item1;
|
||||
topicCount += tuple.Item2;
|
||||
}
|
||||
@ -130,7 +132,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
/// </summary>
|
||||
/// <param name="subscribe"></param>
|
||||
/// <param name="provider"></param>
|
||||
private static Tuple<int,int> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
|
||||
private static async Task<Tuple<int,int>> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
|
||||
{
|
||||
var subscribedMethods = subscribe.GetType().GetMethods()
|
||||
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
|
||||
@ -141,13 +143,16 @@ namespace JiShe.CollectBus.Kafka
|
||||
foreach (var sub in subscribedMethods)
|
||||
{
|
||||
int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
|
||||
//var adminClientService = provider.GetRequiredService<IAdminClientService>();
|
||||
var adminClientService = provider.GetRequiredService<IAdminClientService>();
|
||||
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
|
||||
partitionCount= partitionCount> topicCount ? topicCount: partitionCount;
|
||||
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
|
||||
if (partitionCount <= 0)
|
||||
partitionCount = 1;
|
||||
for (int i = 0; i < partitionCount; i++)
|
||||
{
|
||||
Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger));
|
||||
//if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName)
|
||||
await StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger);
|
||||
threadCount++;
|
||||
}
|
||||
}
|
||||
@ -173,7 +178,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
try
|
||||
{
|
||||
// 处理消息
|
||||
return await ProcessMessageAsync(message, method, subscribe);
|
||||
return await ProcessMessageAsync(message.ToList(), method, subscribe);
|
||||
}
|
||||
catch (ConsumeException ex)
|
||||
{
|
||||
@ -190,7 +195,7 @@ namespace JiShe.CollectBus.Kafka
|
||||
try
|
||||
{
|
||||
// 处理消息
|
||||
return await ProcessMessageAsync(message, method, subscribe);
|
||||
return await ProcessMessageAsync(new List<object>() { message }, method, subscribe);
|
||||
}
|
||||
catch (ConsumeException ex)
|
||||
{
|
||||
@ -211,35 +216,26 @@ namespace JiShe.CollectBus.Kafka
|
||||
/// <param name="method"></param>
|
||||
/// <param name="subscribe"></param>
|
||||
/// <returns></returns>
|
||||
private static async Task<bool> ProcessMessageAsync(object message, MethodInfo method, object subscribe)
|
||||
private static async Task<bool> ProcessMessageAsync(List<object> messages, MethodInfo method, object subscribe)
|
||||
{
|
||||
var parameters = method.GetParameters();
|
||||
bool isGenericTask = method.ReturnType.IsGenericType
|
||||
&& method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>);
|
||||
bool existParameters = parameters.Length > 0;
|
||||
//dynamic? messageObj = null;
|
||||
//if (existParameters)
|
||||
//{
|
||||
// var paramType = parameters[0].ParameterType;
|
||||
// if (paramType.IsInstanceOfType(message))
|
||||
// return message; // 类型兼容则直接使用
|
||||
|
||||
// var json = message.ToString();
|
||||
// messageObj= message.de JsonConvert.DeserializeObject(json, targetType);
|
||||
//}
|
||||
|
||||
object[] args = null;
|
||||
List<object>? messageObj = null;
|
||||
if (existParameters)
|
||||
{
|
||||
messageObj = new List<object>();
|
||||
var paramType = parameters[0].ParameterType;
|
||||
// 类型转换逻辑
|
||||
object convertedMessage = paramType.IsInstanceOfType(message)
|
||||
? message
|
||||
: ConvertMessage(message, paramType);
|
||||
|
||||
args = new object[] { convertedMessage };
|
||||
foreach (var msg in messages)
|
||||
{
|
||||
var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg;
|
||||
if (data != null)
|
||||
messageObj.Add(data);
|
||||
}
|
||||
}
|
||||
var result = method.Invoke(subscribe, args);
|
||||
|
||||
var result = method.Invoke(subscribe, messageObj?.ToArray());
|
||||
if (result is Task<ISubscribeAck> genericTask)
|
||||
{
|
||||
await genericTask.ConfigureAwait(false);
|
||||
@ -254,33 +250,8 @@ namespace JiShe.CollectBus.Kafka
|
||||
{
|
||||
return ackResult.Ack;
|
||||
}
|
||||
|
||||
//if (isGenericTask)
|
||||
//{
|
||||
// object? result = await (Task<ISubscribeAck>)method.Invoke(subscribe, existParameters? new[] { message } :null)!;
|
||||
// if (result is ISubscribeAck ackResult)
|
||||
// {
|
||||
// return ackResult.Ack;
|
||||
// }
|
||||
//}
|
||||
//else
|
||||
//{
|
||||
// object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null);
|
||||
// if (result is ISubscribeAck ackResult)
|
||||
// {
|
||||
// return ackResult.Ack;
|
||||
// }
|
||||
//}
|
||||
return false;
|
||||
}
|
||||
// 可扩展的类型转换器
|
||||
private static object ConvertMessage(object rawMessage, Type targetType)
|
||||
{
|
||||
if (targetType.IsInstanceOfType(rawMessage))
|
||||
return rawMessage; // 类型兼容则直接使用
|
||||
|
||||
var json = rawMessage.ToString();
|
||||
return JsonConvert.DeserializeObject(json, targetType);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user