diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index e4c078d..3e1c2ea 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -217,9 +217,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS [KafkaSubscribe(["test-topic"])] - public async Task KafkaSubscribeAsync(string obj) + public async Task KafkaSubscribeAsync(object obj) { _logger.LogWarning($"收到订阅消息: {obj}"); - return await Task.FromResult(true); + return SubscribeAck.Success(); } } diff --git a/src/JiShe.CollectBus.KafkaProducer/ISubscribeAck.cs b/src/JiShe.CollectBus.KafkaProducer/ISubscribeAck.cs new file mode 100644 index 0000000..ffb30ef --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/ISubscribeAck.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka +{ + public interface ISubscribeAck + { + /// + /// 是否成功标记 + /// + bool Ack { get; set; } + + /// + /// 消息 + /// + string? Msg { get; set; } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 6a5f5cb..6cf2b60 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -115,14 +115,18 @@ namespace JiShe.CollectBus.Kafka if (method.ReturnType == typeof(Task)) { object? result = await (Task)method.Invoke(subscribe, new[] { messageObj })!; - if (result is bool success) - return success; + if (result is ISubscribeAck ackResult) + { + return ackResult.Ack; + } } else { object? result = method.Invoke(subscribe, new[] { messageObj }); - if (result is bool success) - return success; + if (result is ISubscribeAck ackResult) + { + return ackResult.Ack; + } } return false; diff --git a/src/JiShe.CollectBus.KafkaProducer/SubscribeResult.cs b/src/JiShe.CollectBus.KafkaProducer/SubscribeResult.cs new file mode 100644 index 0000000..83eaa49 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/SubscribeResult.cs @@ -0,0 +1,75 @@ +using Confluent.Kafka; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using static System.Runtime.InteropServices.JavaScript.JSType; + +namespace JiShe.CollectBus.Kafka +{ + public class SubscribeResult: ISubscribeAck + { + /// + /// 是否成功 + /// + public bool Ack { get; set; } + + /// + /// 消息 + /// + public string? Msg { get; set; } + + + /// + /// 成功 + /// + /// 消息 + public SubscribeResult Success(string? msg = null) + { + Ack = true; + Msg = msg; + return this; + } + + /// + /// 失败 + /// + /// + /// + /// + /// + public SubscribeResult Fail(string? msg = null) + { + Msg = msg; + Ack = false; + return this; + } + } + + public static partial class SubscribeAck + { + + /// + /// 成功 + /// + /// 消息 + /// + public static ISubscribeAck Success(string? msg = null) + { + return new SubscribeResult().Success(msg); + } + + + /// + /// 失败 + /// + /// 消息 + /// + public static ISubscribeAck Fail(string? msg = null) + { + return new SubscribeResult().Fail(msg); + } + } + +}