修复kafka基准测试

This commit is contained in:
zenghongyao 2025-04-30 09:50:50 +08:00
parent b0bbf67f87
commit 4e30cc13f1
7 changed files with 89 additions and 88 deletions

View File

@ -31,10 +31,14 @@
<ProjectReference Include="..\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.Kafka\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup> </ItemGroup>
<!--<ItemGroup> <!--注意基准测试需要引用dll测试-->
<Reference Include="JiShe.CollectBus.Kafka"> <!--<ItemGroup>
<HintPath>Lib\JiShe.CollectBus.Kafka.dll</HintPath> <Reference Include="JiShe.CollectBus.Common">
</Reference> <HintPath>Lib\JiShe.CollectBus.Common.dll</HintPath>
</ItemGroup>--> </Reference>
<Reference Include="JiShe.CollectBus.Kafka">
<HintPath>Lib\JiShe.CollectBus.Kafka.dll</HintPath>
</Reference>
</ItemGroup>-->
</Project> </Project>

View File

@ -1,8 +1,10 @@
using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs; using BenchmarkDotNet.Jobs;
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
@ -24,11 +26,11 @@ namespace JiShe.CollectBus.Kafka.Test
{ {
// 每批消息数量 // 每批消息数量
[Params(1000, 10000, 100000)] [Params(1000, 10000, 100000, 1000000)]
public int N; public int N;
public ServiceProvider _serviceProvider; public ServiceProvider _serviceProvider;
public IConsumerService _consumerService; public IConsumerService _consumerService;
public IProducerService _producerService; public IProducerService _producerService;
public string topic = "test-topic1"; public string topic = "test-topic1";
[GlobalSetup] [GlobalSetup]
@ -40,13 +42,22 @@ namespace JiShe.CollectBus.Kafka.Test
.AddJsonFile("appsettings.json") .AddJsonFile("appsettings.json")
.Build(); .Build();
// 直接读取配置项 // 直接读取配置项
var greeting = config["ServerTagName"]; var greeting = config["Kafka:ServerTagName"];
Console.WriteLine(greeting); // 输出: Hello, World! Console.WriteLine(greeting); // 输出: Hello, World!
// 创建服务容器 // 创建服务容器
var services = new ServiceCollection(); var services = new ServiceCollection();
// 注册 IConfiguration 实例 // 注册 IConfiguration 实例
services.AddSingleton<IConfiguration>(config); services.AddSingleton<IConfiguration>(config);
services.Configure<KafkaOptionConfig>(options =>
{
config.GetSection("Kafka").Bind(options);
});
services.Configure<ServerApplicationOptions>(options =>
{
config.GetSection(nameof(ServerApplicationOptions)).Bind(options);
});
// 初始化日志 // 初始化日志
Log.Logger = new LoggerConfiguration() Log.Logger = new LoggerConfiguration()
.ReadFrom.Configuration(config) // 从 appsettings.json 读取配置 .ReadFrom.Configuration(config) // 从 appsettings.json 读取配置
@ -61,6 +72,8 @@ namespace JiShe.CollectBus.Kafka.Test
services.AddSingleton<IAdminClientService, AdminClientService>(); services.AddSingleton<IAdminClientService, AdminClientService>();
services.AddSingleton<IProducerService, ProducerService>(); services.AddSingleton<IProducerService, ProducerService>();
services.AddSingleton<IConsumerService, ConsumerService>(); services.AddSingleton<IConsumerService, ConsumerService>();
services.AddSingleton<KafkaPollyPipeline>();
services.AddTransient<KafkaSubscribeTest>();
// 构建ServiceProvider // 构建ServiceProvider
_serviceProvider = services.BuildServiceProvider(); _serviceProvider = services.BuildServiceProvider();
@ -75,7 +88,7 @@ namespace JiShe.CollectBus.Kafka.Test
//await adminClientService.DeleteTopicAsync(topic); //await adminClientService.DeleteTopicAsync(topic);
// 创建 topic // 创建 topic
adminClientService.CreateTopicAsync(topic, 3, 3).ConfigureAwait(false).GetAwaiter(); //adminClientService.CreateTopicAsync(topic, 3, 3).ConfigureAwait(false).GetAwaiter();
_consumerService = _serviceProvider.GetRequiredService<IConsumerService>(); _consumerService = _serviceProvider.GetRequiredService<IConsumerService>();
@ -100,7 +113,7 @@ namespace JiShe.CollectBus.Kafka.Test
List<Task> tasks = new(); List<Task> tasks = new();
for (int i = 0; i < N; ++i) for (int i = 0; i < N; ++i)
{ {
var task = _producerService.ProduceAsync<string>(topic, i.ToString(),null); var task = _producerService.ProduceAsync<string>(topic, i.ToString(), null);
} }
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
} }

View File

@ -1,8 +1,5 @@
// See https://aka.ms/new-console-template for more information // See https://aka.ms/new-console-template for more information
using BenchmarkDotNet.Configs; using JiShe.CollectBus.Common;
using BenchmarkDotNet.Running;
using Confluent.Kafka;
using DeviceDetectorNET.Parser.Device;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.AdminClient;
@ -17,10 +14,6 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Serilog; using Serilog;
using System.Diagnostics;
using System.Reflection;
using System.Reflection.PortableExecutable;
using System.Text.Json;
#region #region
//var summary = BenchmarkRunner.Run<KafkaProduceBenchmark>(); //var summary = BenchmarkRunner.Run<KafkaProduceBenchmark>();
@ -58,7 +51,18 @@ var host = Host.CreateDefaultBuilder(args)
logging.ClearProviders(); logging.ClearProviders();
logging.AddSerilog(); logging.AddSerilog();
}); });
services.Configure<KafkaOptionConfig>(config.GetSection("Kafka")); //services.Configure<KafkaOptionConfig>(config.GetSection("Kafka"));
//services.Configure<ServerApplicationOptions>(config.GetSection("ServerApplicationOptions"));
var dss = config.GetSection("Kafka");
services.Configure<KafkaOptionConfig>(options =>
{
config.GetSection("Kafka").Bind(options);
});
services.Configure<ServerApplicationOptions>(options =>
{
config.GetSection(nameof(ServerApplicationOptions)).Bind(options);
});
services.AddSingleton<IAdminClientService, AdminClientService>(); services.AddSingleton<IAdminClientService, AdminClientService>();
services.AddSingleton<IProducerService, ProducerService>(); services.AddSingleton<IProducerService, ProducerService>();
@ -86,8 +90,20 @@ var host = Host.CreateDefaultBuilder(args)
var loggerFactory = host.Services.GetRequiredService<ILoggerFactory>(); var loggerFactory = host.Services.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<Program>(); var logger = loggerFactory.CreateLogger<Program>();
logger.LogInformation("程序启动"); logger.LogInformation("程序启动");
var _kafkaPollyPipeline = host.Services.GetRequiredService<KafkaPollyPipeline>();
if (_kafkaPollyPipeline == null)
{
logger.LogInformation("KafkaPollyPipeline未注册");
}
var adminClientService = host.Services.GetRequiredService<IAdminClientService>(); var adminClientService = host.Services.GetRequiredService<IAdminClientService>();
var configuration = host.Services.GetRequiredService<IConfiguration>(); var configuration = host.Services.GetRequiredService<IConfiguration>();
var kafkaOptionConfig=host.Services.GetRequiredService<IOptions<ServerApplicationOptions>>();
string topic = ProtocolConst.TESTTOPIC; string topic = ProtocolConst.TESTTOPIC;
//await adminClientService.DeleteTopicAsync(topic); //await adminClientService.DeleteTopicAsync(topic);
// 创建 topic // 创建 topic

View File

@ -13,7 +13,8 @@
"DotNetCore.CAP": "Warning", "DotNetCore.CAP": "Warning",
"Serilog.AspNetCore": "Information", "Serilog.AspNetCore": "Information",
"Microsoft.EntityFrameworkCore": "Warning", "Microsoft.EntityFrameworkCore": "Warning",
"Microsoft.AspNetCore": "Warning" "Microsoft.AspNetCore": "Warning",
"Microsoft.AspNetCore.Diagnostics.HealthChecks": "Warning"
} }
}, },
"WriteTo": [ "WriteTo": [
@ -34,7 +35,7 @@
"CorsOrigins": "http://localhost:4200,http://localhost:3100" "CorsOrigins": "http://localhost:4200,http://localhost:3100"
}, },
"ConnectionStrings": { "ConnectionStrings": {
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", "Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.1.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
@ -43,7 +44,7 @@
"Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"MaxPoolSize": "50", "MaxPoolSize": "50",
"DefaultDB": "14", "DefaultDB": "14",
"HangfireDB": "15" "HangfireDB": "13"
}, },
"Jwt": { "Jwt": {
"Audience": "JiShe.CollectBus", "Audience": "JiShe.CollectBus",
@ -51,16 +52,11 @@
"Issuer": "JiShe.CollectBus", "Issuer": "JiShe.CollectBus",
"ExpirationTime": 2 "ExpirationTime": 2
}, },
"HealthCheck": { "HealthChecks": {
"IsEnable": true, "IsEnable": true,
"MySql": { "HealthCheckDatabaseName": "HealthChecks",
"IsEnable": true "EvaluationTimeInSeconds": 10,
}, "MinimumSecondsBetweenFailureNotifications": 60
"Pings": {
"IsEnable": true,
"Host": "https://www.baidu.com/",
"TimeOut": 5000
}
}, },
"SwaggerConfig": [ "SwaggerConfig": [
{ {
@ -74,14 +70,6 @@
"Version": "V1" "Version": "V1"
} }
], ],
"Cap": {
"RabbitMq": {
"HostName": "118.190.144.92",
"UserName": "collectbus",
"Password": "123456",
"Port": 5672
}
},
"Kafka": { "Kafka": {
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"EnableFilter": true, "EnableFilter": true,
@ -92,48 +80,18 @@
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"ServerTagName": "JiSheCollectBus99" "ServerTagName": "JiSheCollectBus4",
//"Topic": { "FirstCollectionTime": "2025-04-22 16:07:00"
// "ReplicationFactor": 3,
// "NumPartitions": 1000
//}
}, },
//"Kafka": {
// "Connections": {
// "Default": {
// "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092"
// // "SecurityProtocol": "SASL_PLAINTEXT",
// // "SaslMechanism": "PLAIN",
// // "SaslUserName": "lixiao",
// // "SaslPassword": "lixiao1980",
// }
// },
// "Consumer": {
// "GroupId": "JiShe.CollectBus"
// },
// "Producer": {
// "MessageTimeoutMs": 6000,
// "Acks": -1
// },
// "Topic": {
// "ReplicationFactor": 3,
// "NumPartitions": 1000
// },
// "EventBus": {
// "GroupId": "JiShe.CollectBus",
// "TopicName": "DefaultTopicName"
// }
//},
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
"Password": "root", "Password": "root",
"ClusterList": [ "192.168.1.9:6667" ], "ClusterList": [ "192.168.1.9:6667" ],
"PoolSize": 2, "PoolSize": 32,
"DataBaseName": "energy", "DataBaseName": "energy",
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus3",
"Cassandra": { "Cassandra": {
"ReplicationStrategy": { "ReplicationStrategy": {
"Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy "Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy
@ -156,6 +114,12 @@
"Port": 9043, "Port": 9043,
"DataCenter": "dc1", "DataCenter": "dc1",
"Rack": "RAC2" "Rack": "RAC2"
},
{
"Host": "192.168.1.9",
"Port": 9044,
"DataCenter": "dc1",
"Rack": "RAC2"
} }
], ],
"Username": "admin", "Username": "admin",
@ -176,5 +140,17 @@
"SerialConsistencyLevel": "Serial", "SerialConsistencyLevel": "Serial",
"DefaultIdempotence": true "DefaultIdempotence": true
} }
} },
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus4",
"SystemType": "Energy",
"FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00",
"AutomaticTerminalVersionTime": "17:07:00",
"AutomaticTelematicsModuleTime": "17:30:00",
"AutomaticDayFreezeTime": "02:30:00",
"AutomaticMonthFreezeTime": "03:30:00",
"DefaultProtocolPlugin": "T37612012ProtocolPlugin"
},
"PlugInFolder": ""
} }

View File

@ -1,16 +1,8 @@
using Confluent.Kafka; using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.CircuitBreaker; using Polly.CircuitBreaker;
using Polly.Retry; using Polly.Retry;
using Polly;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Polly.Contrib.WaitAndRetry;
using Volo.Abp.DependencyInjection;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Kafka.Producer;
namespace JiShe.CollectBus.Kafka.Internal namespace JiShe.CollectBus.Kafka.Internal
{ {

View File

@ -96,7 +96,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (taskData != null) if (taskData != null)
{ {
// 更新 // 更新
meter.Timestamps = taskData.Timestamps; meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived=true; taskData.IsReceived=true;
taskData.ReceivedMessageHexString= analysisBaseDto.HexMessage; taskData.ReceivedMessageHexString= analysisBaseDto.HexMessage;
taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty; taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty;
@ -197,7 +197,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (taskData != null) if (taskData != null)
{ {
// 更新 // 更新
meter.Timestamps = taskData.Timestamps; meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived = true; taskData.IsReceived = true;
taskData.ReceivedMessageHexString = analysisBaseDto.HexMessage; taskData.ReceivedMessageHexString = analysisBaseDto.HexMessage;
taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty; taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty;