From 3790f3918e4bb9cc4511e1fb7ef1bf4875b17a31 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Wed, 16 Apr 2025 20:41:52 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0Header=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E8=BF=87=E6=BB=A4=E5=BC=80=E5=85=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/JiShe.CollectBus.Host/appsettings.json | 1 + .../Consumer/ConsumerService.cs | 64 +++++++++++-------- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index a837636..91ff4ea 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -83,6 +83,7 @@ }, "Kafka": { "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", + "EnableFilter": true, "EnableAuthorization": false, "SecurityProtocol": "SASL_PLAINTEXT", "SaslMechanism": "PLAIN", diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index 0625304..da9258c 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -140,15 +140,16 @@ namespace JiShe.CollectBus.Kafka.Consumer await Task.Delay(TimeSpan.FromSeconds(1),cts.Token); continue; } - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition); - - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } } bool sucess= await messageHandler(result.Message.Key, result.Message.Value); if (sucess) @@ -207,14 +208,16 @@ namespace JiShe.CollectBus.Kafka.Consumer await Task.Delay(100, cts.Token); continue; } - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header值未匹配", result.Topic, result.Partition); - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } } bool sucess = await messageHandler(result.Message.Value); if (sucess) @@ -293,14 +296,16 @@ namespace JiShe.CollectBus.Kafka.Consumer } else if (result.Message.Value != null) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition); - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } } messages.Add((result.Message.Value, result.TopicPartitionOffset)); //messages.Add(result.Message.Value); @@ -425,13 +430,16 @@ namespace JiShe.CollectBus.Kafka.Consumer } else if (result.Message.Value != null) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) { - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } } messages.Add((result.Message.Value, result.TopicPartitionOffset)); //messages.Add(result.Message.Value);