diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
index 5547919..0625304 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
@@ -131,7 +131,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
if (result == null || result.Message==null || result.Message.Value == null)
{
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
- consumer.Commit(result); // 手动提交
+ //consumer.Commit(result); // 手动提交
continue;
}
if (result.IsPartitionEOF)
@@ -198,7 +198,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
if (result == null || result.Message==null || result.Message.Value == null)
{
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
- consumer.Commit(result); // 手动提交
+ //consumer.Commit(result); // 手动提交
continue;
}
if (result.IsPartitionEOF)
diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs
index 3925014..d86dba8 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/IConsumerService.cs
@@ -33,10 +33,13 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class;
+ Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
- Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
+ Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
+
+ Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
void Unsubscribe() where TKey : notnull where TValue : class;
}