diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index 9c67aae..0d0c5e4 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -39,6 +39,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -113,6 +115,10 @@ Global {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU + {FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index affc34d..57c1823 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -91,7 +91,7 @@ "SaslUserName": "lixiao", "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, - "NumPartitions": 1, + "NumPartitions": 30, "ServerTagName": "JiSheCollectBus2" //"Topic": { // "ReplicationFactor": 3, diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index ea67b74..4340524 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -175,16 +175,17 @@ namespace JiShe.CollectBus.Kafka.Consumer { var consumerKey = typeof(KafkaConsumer); var cts = new CancellationTokenSource(); - if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName)) - { - string ssss = ""; - } + //if (topics.Contains(ProtocolConst.SubscriberLoginReceivedEventName)) + //{ + // string ssss = ""; + //} var consumer = _consumerStore.GetOrAdd(consumerKey, _=> ( CreateConsumer(groupId), cts )).Consumer as IConsumer; + consumer!.Subscribe(topics); _ = Task.Run(async () => diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 8962327..b09b3be 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; @@ -12,6 +13,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Newtonsoft.Json; using System.Reflection; +using System.Threading.Tasks; namespace JiShe.CollectBus.Kafka { @@ -66,11 +68,11 @@ namespace JiShe.CollectBus.Kafka foreach (var subscribeType in subscribeTypes) { var subscribes = provider.GetServices(subscribeType).ToList(); - subscribes.ForEach(subscribe => + subscribes.ForEach(async subscribe => { if (subscribe!=null) { - Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); + Tuple tuple = await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } @@ -111,11 +113,11 @@ namespace JiShe.CollectBus.Kafka foreach (var subscribeType in subscribeTypes) { var subscribes = provider.GetServices(subscribeType).ToList(); - subscribes.ForEach(subscribe => { + subscribes.ForEach(async subscribe => { if (subscribe != null) { - Tuple tuple = BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); + Tuple tuple =await BuildKafkaSubscribe(subscribe, provider, logger, kafkaOptions.Value); threadCount += tuple.Item1; topicCount += tuple.Item2; } @@ -130,7 +132,7 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig) + private static async Task> BuildKafkaSubscribe(object subscribe, IServiceProvider provider,ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() .Select(m => new { Method = m, Attribute = m.GetCustomAttribute() }) @@ -141,13 +143,16 @@ namespace JiShe.CollectBus.Kafka foreach (var sub in subscribedMethods) { int partitionCount = 3;// kafkaOptionConfig.NumPartitions; - //var adminClientService = provider.GetRequiredService(); + var adminClientService = provider.GetRequiredService(); + int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic); + partitionCount= partitionCount> topicCount ? topicCount: partitionCount; //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0) partitionCount = 1; for (int i = 0; i < partitionCount; i++) { - Task.Run(() => StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger)); + //if (sub.Attribute!.Topic == ProtocolConst.SubscriberLoginReceivedEventName) + await StartConsumerAsync(provider, sub.Attribute!, sub.Method, subscribe, logger); threadCount++; } } @@ -173,7 +178,7 @@ namespace JiShe.CollectBus.Kafka try { // 处理消息 - return await ProcessMessageAsync(message, method, subscribe); + return await ProcessMessageAsync(message.ToList(), method, subscribe); } catch (ConsumeException ex) { @@ -190,7 +195,7 @@ namespace JiShe.CollectBus.Kafka try { // 处理消息 - return await ProcessMessageAsync(message, method, subscribe); + return await ProcessMessageAsync(new List() { message }, method, subscribe); } catch (ConsumeException ex) { @@ -211,35 +216,26 @@ namespace JiShe.CollectBus.Kafka /// /// /// - private static async Task ProcessMessageAsync(object message, MethodInfo method, object subscribe) + private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); bool isGenericTask = method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); bool existParameters = parameters.Length > 0; - //dynamic? messageObj = null; - //if (existParameters) - //{ - // var paramType = parameters[0].ParameterType; - // if (paramType.IsInstanceOfType(message)) - // return message; // 类型兼容则直接使用 - - // var json = message.ToString(); - // messageObj= message.de JsonConvert.DeserializeObject(json, targetType); - //} - - object[] args = null; + List? messageObj = null; if (existParameters) { + messageObj = new List(); var paramType = parameters[0].ParameterType; - // 类型转换逻辑 - object convertedMessage = paramType.IsInstanceOfType(message) - ? message - : ConvertMessage(message, paramType); - - args = new object[] { convertedMessage }; + foreach (var msg in messages) + { + var data = paramType != typeof(string) ? msg?.ToString()?.Deserialize(paramType) : msg; + if (data != null) + messageObj.Add(data); + } } - var result = method.Invoke(subscribe, args); + + var result = method.Invoke(subscribe, messageObj?.ToArray()); if (result is Task genericTask) { await genericTask.ConfigureAwait(false); @@ -254,33 +250,8 @@ namespace JiShe.CollectBus.Kafka { return ackResult.Ack; } - - //if (isGenericTask) - //{ - // object? result = await (Task)method.Invoke(subscribe, existParameters? new[] { message } :null)!; - // if (result is ISubscribeAck ackResult) - // { - // return ackResult.Ack; - // } - //} - //else - //{ - // object? result = method.Invoke(subscribe, existParameters ? new[] { message } : null); - // if (result is ISubscribeAck ackResult) - // { - // return ackResult.Ack; - // } - //} return false; } - // 可扩展的类型转换器 - private static object ConvertMessage(object rawMessage, Type targetType) - { - if (targetType.IsInstanceOfType(rawMessage)) - return rawMessage; // 类型兼容则直接使用 - - var json = rawMessage.ToString(); - return JsonConvert.DeserializeObject(json, targetType); - } + } }