From 78f9ef349adb24c57086345672c0a63de9442cf7 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Wed, 16 Apr 2025 18:46:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96kafka=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E9=BB=98=E8=AE=A4=E5=BA=8F=E5=88=97=E5=8C=96=E5=92=8C=E5=8F=8D?= =?UTF-8?q?=E5=BA=8F=E5=88=97=E5=8C=96=20=E5=A2=9E=E5=8A=A0=E6=89=B9?= =?UTF-8?q?=E9=87=8F=E6=B6=88=E8=B4=B9=20=E5=A2=9E=E5=8A=A0Header=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E8=BF=87=E6=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Consumer/ConsumerService.cs | 4 ++-- .../Consumer/IConsumerService.cs | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index 5547919..0625304 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -131,7 +131,7 @@ namespace JiShe.CollectBus.Kafka.Consumer if (result == null || result.Message==null || result.Message.Value == null) { _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); - consumer.Commit(result); // 手动提交 + //consumer.Commit(result); // 手动提交 continue; } if (result.IsPartitionEOF) @@ -198,7 +198,7 @@ namespace JiShe.CollectBus.Kafka.Consumer if (result == null || result.Message==null || result.Message.Value == null) { _logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL"); - consumer.Commit(result); // 手动提交 + //consumer.Commit(result); // 手动提交 continue; } if (result.IsPartitionEOF) diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs index 3925014..d86dba8 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs @@ -33,10 +33,13 @@ namespace JiShe.CollectBus.Kafka.Consumer /// Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class; + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; - Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; void Unsubscribe() where TKey : notnull where TValue : class; }