添加Header消息过滤开关

This commit is contained in:
zenghongyao 2025-04-16 20:41:52 +08:00
parent 78f9ef349a
commit 3790f3918e
2 changed files with 37 additions and 28 deletions

View File

@ -83,6 +83,7 @@
}, },
"Kafka": { "Kafka": {
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"EnableFilter": true,
"EnableAuthorization": false, "EnableAuthorization": false,
"SecurityProtocol": "SASL_PLAINTEXT", "SecurityProtocol": "SASL_PLAINTEXT",
"SaslMechanism": "PLAIN", "SaslMechanism": "PLAIN",

View File

@ -140,16 +140,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.Delay(TimeSpan.FromSeconds(1),cts.Token); await Task.Delay(TimeSpan.FromSeconds(1),cts.Token);
continue; continue;
} }
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件 // 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers)) if (!headersFilter.Match(result.Message.Headers))
{ {
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition);
//consumer.Commit(result); // 提交偏移量 //consumer.Commit(result); // 提交偏移量
// 跳过消息 // 跳过消息
continue; continue;
} }
}
bool sucess= await messageHandler(result.Message.Key, result.Message.Value); bool sucess= await messageHandler(result.Message.Key, result.Message.Value);
if (sucess) if (sucess)
{ {
@ -207,15 +208,17 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.Delay(100, cts.Token); await Task.Delay(100, cts.Token);
continue; continue;
} }
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件 // 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers)) if (!headersFilter.Match(result.Message.Headers))
{ {
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header值未匹配", result.Topic, result.Partition);
//consumer.Commit(result); // 提交偏移量 //consumer.Commit(result); // 提交偏移量
// 跳过消息 // 跳过消息
continue; continue;
} }
}
bool sucess = await messageHandler(result.Message.Value); bool sucess = await messageHandler(result.Message.Value);
if (sucess) if (sucess)
consumer.Commit(result); // 手动提交 consumer.Commit(result); // 手动提交
@ -292,16 +295,18 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
} }
else if (result.Message.Value != null) else if (result.Message.Value != null)
{
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{ {
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件 // 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers)) if (!headersFilter.Match(result.Message.Headers))
{ {
_logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition);
//consumer.Commit(result); // 提交偏移量 //consumer.Commit(result); // 提交偏移量
// 跳过消息 // 跳过消息
continue; continue;
} }
}
messages.Add((result.Message.Value, result.TopicPartitionOffset)); messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value); //messages.Add(result.Message.Value);
} }
@ -424,6 +429,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
} }
else if (result.Message.Value != null) else if (result.Message.Value != null)
{
if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!))
{ {
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } };
// 检查 Header 是否符合条件 // 检查 Header 是否符合条件
@ -433,6 +440,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
// 跳过消息 // 跳过消息
continue; continue;
} }
}
messages.Add((result.Message.Value, result.TopicPartitionOffset)); messages.Add((result.Message.Value, result.TopicPartitionOffset));
//messages.Add(result.Message.Value); //messages.Add(result.Message.Value);
} }