Compare commits

..

2 Commits

Author SHA1 Message Date
c4335aa3b7 修改代码 2025-05-06 15:28:00 +08:00
f96eb27353 修改代码 2025-05-06 15:21:31 +08:00
3 changed files with 4 additions and 6 deletions

View File

@ -69,7 +69,8 @@ namespace JiShe.CollectBus.Kafka.Producer
{ {
BootstrapServers = _kafkaOptionConfig.BootstrapServers, BootstrapServers = _kafkaOptionConfig.BootstrapServers,
//AllowAutoCreateTopics = true, //AllowAutoCreateTopics = true,
QueueBufferingMaxKbytes = 4194304, // 修改缓冲区最大为2GB默认为1GB QueueBufferingMaxKbytes = 2097151, // 4194304, // 修改缓冲区最大为2GB默认为1GB
QueueBufferingMaxMessages = 100000, // 修改缓冲区消息最大为100000条
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
BatchSize = 32_768, // 修改批次大小为32K BatchSize = 32_768, // 修改批次大小为32K
LingerMs = 10, // 修改等待时间为20ms默认为5ms LingerMs = 10, // 修改等待时间为20ms默认为5ms
@ -141,7 +142,6 @@ namespace JiShe.CollectBus.Kafka.Producer
var producer = GetProducer<Null, TValue>(typeKey); var producer = GetProducer<Null, TValue>(typeKey);
var message = new Message<Null, TValue> var message = new Message<Null, TValue>
{ {
//Key= _kafkaOptionConfig.ServerTagName,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
@ -200,7 +200,6 @@ namespace JiShe.CollectBus.Kafka.Producer
{ {
var message = new Message<Null, TValue> var message = new Message<Null, TValue>
{ {
//Key = _kafkaOptionConfig.ServerTagName,
Value = value, Value = value,
Headers = new Headers{ Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }

View File

@ -173,7 +173,7 @@ namespace JiShe.CollectBus.DataChannels
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogWarning(ex, "Kafka推送{topicName}主题重试中({Retry}/{MaxRetries})", topicName, retry + 1, maxRetries); _logger.LogWarning(ex, "Kafka推送{topicName}主题分区{partition}重试中({Retry}/{MaxRetries})", topicName, partition, retry + 1, maxRetries);
if (retry == maxRetries - 1) throw; if (retry == maxRetries - 1) throw;
await Task.Delay(1000 * (retry + 1)); await Task.Delay(1000 * (retry + 1));
} }

View File

@ -80,7 +80,6 @@
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"ServerTagName": "JiSheCollectBus4",
"FirstCollectionTime": "2025-04-22 16:07:00" "FirstCollectionTime": "2025-04-22 16:07:00"
}, },
"IoTDBOptions": { "IoTDBOptions": {
@ -142,7 +141,7 @@
} }
}, },
"ServerApplicationOptions": { "ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus4", "ServerTagName": "JiSheCollectBus99",
"SystemType": "Energy", "SystemType": "Energy",
"FirstCollectionTime": "2025-04-28 15:07:00", "FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00", "AutomaticVerificationTime": "16:07:00",