From 1e85f894a8aae3ca0ce75bbb0f7b041dc3390611 Mon Sep 17 00:00:00 2001 From: zenghongyao <873884283@qq.com> Date: Tue, 6 May 2025 18:41:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96kafka=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Producer/ProducerService.cs | 12 +++++++----- .../T37612012ProtocolPlugin.cs | 18 ++++++++---------- .../BuildSendDatas/Build3761SendData.cs | 18 +++++++++++++----- 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index f37f3cc..d3f9588 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -7,6 +7,7 @@ using System.Text; using System.Threading.Tasks; using Confluent.Kafka; using JiShe.CollectBus.Common; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; @@ -69,11 +70,11 @@ namespace JiShe.CollectBus.Kafka.Producer { BootstrapServers = _kafkaOptionConfig.BootstrapServers, //AllowAutoCreateTopics = true, - QueueBufferingMaxKbytes = 2097151, // 4194304, // 修改缓冲区最大为2GB,默认为1GB - QueueBufferingMaxMessages = 100000, // 修改缓冲区消息最大为100000条 + QueueBufferingMaxKbytes = 4_194_304, // 4_194_304 2_097_151 // 修改缓冲区最大为2GB,默认为1GB + QueueBufferingMaxMessages = int.MaxValue, // 修改缓冲区消息条 CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd BatchSize = 32_768, // 修改批次大小为32K - LingerMs = 10, // 修改等待时间为20ms,默认为5ms + LingerMs = 20, // 修改等待时间为20ms,默认为5ms Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader MessageSendMaxRetries = 50, // 消息发送失败最大重试50次 MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs @@ -175,7 +176,7 @@ namespace JiShe.CollectBus.Kafka.Producer var producer = GetProducer(typeKey); if (partition.HasValue) { - var topicPartition = new TopicPartition(topic, partition.Value); + var topicPartition = new TopicPartition(topic, new Partition(partition.Value)); producer.Produce(topicPartition, message, deliveryHandler); } else @@ -209,7 +210,8 @@ namespace JiShe.CollectBus.Kafka.Producer var producer = GetProducer(typeKey); if (partition.HasValue) { - var topicPartition = new TopicPartition(topic, partition.Value); + var topicPartition = new TopicPartition(topic,new Partition(partition.Value)); + //_logger.LogError($"push消息:{topic}-{partition.Value}"); producer.Produce(topicPartition, message, deliveryHandler); } else diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs index a469147..247b03f 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/T37612012ProtocolPlugin.cs @@ -610,7 +610,7 @@ namespace JiShe.CollectBus.Protocol.T37612012 { unitData = new UnitData { - HexMessageList = hexStringList.GetRange(14, hexStringList.Count - 14 - 2) //总数字节数-固定长度报文头-控制域C-地址域A-校验和CS-结束字符(16H) + HexMessageList = hexStringList. (14, hexStringList.Count - 14 - 2) //总数字节数-固定长度报文头-控制域C-地址域A-校验和CS-结束字符(16H) }; unitData.HexMessageString = string.Join(" ", unitData.HexMessageList); } @@ -714,11 +714,9 @@ namespace JiShe.CollectBus.Protocol.T37612012 /// /// public virtual List Generate_DataUnit(DataTimeMark timeMark) - { - List values = new List - { - SplitDataTime(timeMark.DataTime)//数据时间 - }; + { + List values = new List(); + values.AddRange(SplitDataTime(timeMark.DataTime));//数据时间 if (timeMark.Density > 0) values.Add(timeMark.Density.HexToDecStr().PadLeft(2, '0'));//密度 if (timeMark.Point > 0) @@ -727,13 +725,13 @@ namespace JiShe.CollectBus.Protocol.T37612012 } - private string SplitDataTime(DateTime dataTime) + private List SplitDataTime(DateTime dataTime) { //2101060815 - List values = new List() { $"{dataTime}:YY", $"{dataTime}:MM", $"{dataTime}:dd", $"{dataTime}:HH", $"{dataTime}:mm", }; - + List values = new List() { $"{dataTime:yy}", $"{dataTime:MM}", $"{dataTime:dd}", $"{dataTime:HH}", $"{dataTime:mm}", }; values.Reverse(); - return string.Join("", values); + return values; + //return string.Join("", values); } #endregion diff --git a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs index be7aac5..76f6b88 100644 --- a/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs +++ b/shared/JiShe.CollectBus.Common/BuildSendDatas/Build3761SendData.cs @@ -1606,13 +1606,21 @@ namespace JiShe.CollectBus.Common.BuildSendDatas /// private static string GetCS(List userData) { - byte sum = 0; - foreach (var d in userData) + try { - var b = Convert.ToByte(d, 16); - sum += b; + byte sum = 0; + foreach (var d in userData) + { + var b = Convert.ToByte(d, 16); + sum += b; + } + return sum.ToString("X2"); } - return sum.ToString("X2"); + catch(Exception) + { + throw; + } + } ///