diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 3e1c2ea..8f5449a 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -215,11 +215,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS return aa == null; } - [KafkaSubscribe(["test-topic"])] + [KafkaSubscribe(["test-topic1"])] - public async Task KafkaSubscribeAsync(object obj) + public async Task KafkaSubscribeAsync() // TestSubscribe obj { + var obj=string.Empty; _logger.LogWarning($"收到订阅消息: {obj}"); return SubscribeAck.Success(); } } + +public class TestSubscribe +{ + public string Topic { get; set; } + public int Val { get; set; } +} + diff --git a/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml index aaadf3f..b438e18 100644 --- a/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,6 +16,7 @@ 后端服务 + diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 6cf2b60..d3ce904 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -1,4 +1,7 @@ using Confluent.Kafka; +using DeviceDetectorNET; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Consumer; using Microsoft.AspNetCore.Builder; @@ -8,6 +11,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Primitives; using Newtonsoft.Json; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; @@ -96,6 +100,7 @@ namespace JiShe.CollectBus.Kafka }); } + /// /// 处理消息 /// @@ -106,15 +111,18 @@ namespace JiShe.CollectBus.Kafka private static async Task ProcessMessageAsync(string message, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); - if (parameters.Length != 1) - return true; - - var paramType = parameters[0].ParameterType; - var messageObj = paramType == typeof(string)? message: JsonConvert.DeserializeObject(message, paramType); - - if (method.ReturnType == typeof(Task)) + bool isGenericTask = method.ReturnType.IsGenericType + && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>); + bool existParameters = parameters.Length > 0; + dynamic? messageObj= null; + if (existParameters) { - object? result = await (Task)method.Invoke(subscribe, new[] { messageObj })!; + var paramType = parameters[0].ParameterType; + messageObj = paramType == typeof(string) ? message : message.Deserialize(paramType); + } + if (isGenericTask) + { + object? result = await (Task)method.Invoke(subscribe, existParameters? new[] { messageObj }:null)!; if (result is ISubscribeAck ackResult) { return ackResult.Ack; @@ -122,13 +130,12 @@ namespace JiShe.CollectBus.Kafka } else { - object? result = method.Invoke(subscribe, new[] { messageObj }); + object? result = method.Invoke(subscribe, existParameters ? new[] { messageObj } : null); if (result is ISubscribeAck ackResult) { return ackResult.Ack; } } - return false; }