优化kafka发布消息

This commit is contained in:
zenghongyao 2025-05-06 18:41:23 +08:00
parent c4335aa3b7
commit 1e85f894a8
3 changed files with 28 additions and 20 deletions

View File

@ -7,6 +7,7 @@ using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
@ -69,11 +70,11 @@ namespace JiShe.CollectBus.Kafka.Producer
{
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
//AllowAutoCreateTopics = true,
QueueBufferingMaxKbytes = 2097151, // 4194304, // 修改缓冲区最大为2GB默认为1GB
QueueBufferingMaxMessages = 100000, // 修改缓冲区消息最大为100000
QueueBufferingMaxKbytes = 4_194_304, // 4_194_304 2_097_151 // 修改缓冲区最大为2GB默认为1GB
QueueBufferingMaxMessages = int.MaxValue, // 修改缓冲区消息
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
BatchSize = 32_768, // 修改批次大小为32K
LingerMs = 10, // 修改等待时间为20ms默认为5ms
LingerMs = 20, // 修改等待时间为20ms默认为5ms
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs
@ -175,7 +176,7 @@ namespace JiShe.CollectBus.Kafka.Producer
var producer = GetProducer<TKey, TValue>(typeKey);
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
var topicPartition = new TopicPartition(topic, new Partition(partition.Value));
producer.Produce(topicPartition, message, deliveryHandler);
}
else
@ -209,7 +210,8 @@ namespace JiShe.CollectBus.Kafka.Producer
var producer = GetProducer<Null, TValue>(typeKey);
if (partition.HasValue)
{
var topicPartition = new TopicPartition(topic, partition.Value);
var topicPartition = new TopicPartition(topic,new Partition(partition.Value));
//_logger.LogError($"push消息{topic}-{partition.Value}");
producer.Produce(topicPartition, message, deliveryHandler);
}
else

View File

@ -610,7 +610,7 @@ namespace JiShe.CollectBus.Protocol.T37612012
{
unitData = new UnitData
{
HexMessageList = hexStringList.GetRange(14, hexStringList.Count - 14 - 2) //总数字节数-固定长度报文头-控制域C-地址域A-校验和CS-结束字符16H
HexMessageList = hexStringList. (14, hexStringList.Count - 14 - 2) //总数字节数-固定长度报文头-控制域C-地址域A-校验和CS-结束字符16H
};
unitData.HexMessageString = string.Join(" ", unitData.HexMessageList);
}
@ -715,10 +715,8 @@ namespace JiShe.CollectBus.Protocol.T37612012
/// <returns></returns>
public virtual List<string> Generate_DataUnit(DataTimeMark timeMark)
{
List<string> values = new List<string>
{
SplitDataTime(timeMark.DataTime)//数据时间
};
List<string> values = new List<string>();
values.AddRange(SplitDataTime(timeMark.DataTime));//数据时间
if (timeMark.Density > 0)
values.Add(timeMark.Density.HexToDecStr().PadLeft(2, '0'));//密度
if (timeMark.Point > 0)
@ -727,13 +725,13 @@ namespace JiShe.CollectBus.Protocol.T37612012
}
private string SplitDataTime(DateTime dataTime)
private List<string> SplitDataTime(DateTime dataTime)
{
//2101060815
List<string> values = new List<string>() { $"{dataTime}:YY", $"{dataTime}:MM", $"{dataTime}:dd", $"{dataTime}:HH", $"{dataTime}:mm", };
List<string> values = new List<string>() { $"{dataTime:yy}", $"{dataTime:MM}", $"{dataTime:dd}", $"{dataTime:HH}", $"{dataTime:mm}", };
values.Reverse();
return string.Join("", values);
return values;
//return string.Join("", values);
}
#endregion

View File

@ -1606,13 +1606,21 @@ namespace JiShe.CollectBus.Common.BuildSendDatas
/// <returns></returns>
private static string GetCS(List<string> userData)
{
byte sum = 0;
foreach (var d in userData)
try
{
var b = Convert.ToByte(d, 16);
sum += b;
byte sum = 0;
foreach (var d in userData)
{
var b = Convert.ToByte(d, 16);
sum += b;
}
return sum.ToString("X2");
}
return sum.ToString("X2");
catch(Exception)
{
throw;
}
}
/// <summary>