优化kafka
增加默认序列化和反序列化 增加批量消费 增加Header消费过滤
This commit is contained in:
parent
eed68d0fe0
commit
78f9ef349a
@ -131,7 +131,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
if (result == null || result.Message==null || result.Message.Value == null)
|
if (result == null || result.Message==null || result.Message.Value == null)
|
||||||
{
|
{
|
||||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
||||||
consumer.Commit(result); // 手动提交
|
//consumer.Commit(result); // 手动提交
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (result.IsPartitionEOF)
|
if (result.IsPartitionEOF)
|
||||||
@ -198,7 +198,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
if (result == null || result.Message==null || result.Message.Value == null)
|
if (result == null || result.Message==null || result.Message.Value == null)
|
||||||
{
|
{
|
||||||
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
_logger.LogWarning($"Kafka消费: {result?.Topic} 分区 {result?.Partition} 值为NULL");
|
||||||
consumer.Commit(result); // 手动提交
|
//consumer.Commit(result); // 手动提交
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (result.IsPartitionEOF)
|
if (result.IsPartitionEOF)
|
||||||
|
|||||||
@ -33,10 +33,13 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
|
Task SubscribeAsync<TValue>(string[] topics, Func<TValue, Task<bool>> messageHandler, string? groupId = null) where TValue : class;
|
||||||
|
|
||||||
|
Task SubscribeBatchAsync<TKey, TValue>(string[] topics, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
|
||||||
|
|
||||||
Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
|
Task SubscribeBatchAsync<TKey, TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
|
||||||
|
|
||||||
Task SubscribeBatchAsync<TValue>(string[] topics, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
|
Task SubscribeBatchAsync<TValue>(string topic, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
|
||||||
|
|
||||||
|
Task SubscribeBatchAsync<TValue>(string[] topics, Func<IEnumerable<TValue>, Task<bool>> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
|
||||||
|
|
||||||
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
|
void Unsubscribe<TKey, TValue>() where TKey : notnull where TValue : class;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user