diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index a837636..91ff4ea 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -83,6 +83,7 @@ }, "Kafka": { "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", + "EnableFilter": true, "EnableAuthorization": false, "SecurityProtocol": "SASL_PLAINTEXT", "SaslMechanism": "PLAIN", diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index 0625304..da9258c 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -140,15 +140,16 @@ namespace JiShe.CollectBus.Kafka.Consumer await Task.Delay(TimeSpan.FromSeconds(1),cts.Token); continue; } - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition); - - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } } bool sucess= await messageHandler(result.Message.Key, result.Message.Value); if (sucess) @@ -207,14 +208,16 @@ namespace JiShe.CollectBus.Kafka.Consumer await Task.Delay(100, cts.Token); continue; } - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header值未匹配", result.Topic, result.Partition); - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } } bool sucess = await messageHandler(result.Message.Value); if (sucess) @@ -293,14 +296,16 @@ namespace JiShe.CollectBus.Kafka.Consumer } else if (result.Message.Value != null) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) { - _logger.LogInformation("Kafka消费: {Topic} 分区 {Partition} Header未匹配", result.Topic, result.Partition); - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } } messages.Add((result.Message.Value, result.TopicPartitionOffset)); //messages.Add(result.Message.Value); @@ -425,13 +430,16 @@ namespace JiShe.CollectBus.Kafka.Consumer } else if (result.Message.Value != null) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; - // 检查 Header 是否符合条件 - if (!headersFilter.Match(result.Message.Headers)) + if (bool.Parse(_configuration["KafkaConsumer:EnableFilter"]!)) { - //consumer.Commit(result); // 提交偏移量 - // 跳过消息 - continue; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_configuration["ServerTagName"]!) } }; + // 检查 Header 是否符合条件 + if (!headersFilter.Match(result.Message.Headers)) + { + //consumer.Commit(result); // 提交偏移量 + // 跳过消息 + continue; + } } messages.Add((result.Message.Value, result.TopicPartitionOffset)); //messages.Add(result.Message.Value);