From 7b1f04a45207f0826aaeb46da36c59ea8b3f4638 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Tue, 15 Apr 2025 16:45:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Samples/SampleAppService.cs | 4 +- .../ISubscribeAck.cs | 21 ++++++ .../KafkaSubcribesExtensions.cs | 12 ++- .../SubscribeResult.cs | 75 +++++++++++++++++++ 4 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 src/JiShe.CollectBus.KafkaProducer/ISubscribeAck.cs create mode 100644 src/JiShe.CollectBus.KafkaProducer/SubscribeResult.cs diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index e4c078d..3e1c2ea 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -217,9 +217,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS [KafkaSubscribe(["test-topic"])] - public async Task KafkaSubscribeAsync(string obj) + public async Task KafkaSubscribeAsync(object obj) { _logger.LogWarning($"收到订阅消息: {obj}"); - return await Task.FromResult(true); + return SubscribeAck.Success(); } } diff --git a/src/JiShe.CollectBus.KafkaProducer/ISubscribeAck.cs b/src/JiShe.CollectBus.KafkaProducer/ISubscribeAck.cs new file mode 100644 index 0000000..ffb30ef --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/ISubscribeAck.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka +{ + public interface ISubscribeAck + { + /// + /// 是否成功标记 + /// + bool Ack { get; set; } + + /// + /// 消息 + /// + string? Msg { get; set; } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index 6a5f5cb..6cf2b60 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -115,14 +115,18 @@ namespace JiShe.CollectBus.Kafka if (method.ReturnType == typeof(Task)) { object? result = await (Task)method.Invoke(subscribe, new[] { messageObj })!; - if (result is bool success) - return success; + if (result is ISubscribeAck ackResult) + { + return ackResult.Ack; + } } else { object? result = method.Invoke(subscribe, new[] { messageObj }); - if (result is bool success) - return success; + if (result is ISubscribeAck ackResult) + { + return ackResult.Ack; + } } return false; diff --git a/src/JiShe.CollectBus.KafkaProducer/SubscribeResult.cs b/src/JiShe.CollectBus.KafkaProducer/SubscribeResult.cs new file mode 100644 index 0000000..83eaa49 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/SubscribeResult.cs @@ -0,0 +1,75 @@ +using Confluent.Kafka; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using static System.Runtime.InteropServices.JavaScript.JSType; + +namespace JiShe.CollectBus.Kafka +{ + public class SubscribeResult: ISubscribeAck + { + /// + /// 是否成功 + /// + public bool Ack { get; set; } + + /// + /// 消息 + /// + public string? Msg { get; set; } + + + /// + /// 成功 + /// + /// 消息 + public SubscribeResult Success(string? msg = null) + { + Ack = true; + Msg = msg; + return this; + } + + /// + /// 失败 + /// + /// + /// + /// + /// + public SubscribeResult Fail(string? msg = null) + { + Msg = msg; + Ack = false; + return this; + } + } + + public static partial class SubscribeAck + { + + /// + /// 成功 + /// + /// 消息 + /// + public static ISubscribeAck Success(string? msg = null) + { + return new SubscribeResult().Success(msg); + } + + + /// + /// 失败 + /// + /// 消息 + /// + public static ISubscribeAck Fail(string? msg = null) + { + return new SubscribeResult().Fail(msg); + } + } + +}