2025-04-18 09:29:38 +08:00

109 lines
3.6 KiB
C#

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using Confluent.Kafka;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Producer;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Serilog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Test
{
[SimpleJob(RuntimeMoniker.Net80)]
//[SimpleJob(RuntimeMoniker.NativeAot80)]
[RPlotExporter]
public class KafkaProduceBenchmark
{
// 每批消息数量
[Params(1000, 10000, 100000)]
public int N;
public ServiceProvider _serviceProvider;
public IConsumerService _consumerService;
public IProducerService _producerService;
public string topic = "test-topic1";
[GlobalSetup]
public void Setup()
{
// 构建配置
var config = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json")
.Build();
// 直接读取配置项
var greeting = config["ServerTagName"];
Console.WriteLine(greeting); // 输出: Hello, World!
// 创建服务容器
var services = new ServiceCollection();
// 注册 IConfiguration 实例
services.AddSingleton<IConfiguration>(config);
// 初始化日志
Log.Logger = new LoggerConfiguration()
.ReadFrom.Configuration(config) // 从 appsettings.json 读取配置
.CreateLogger();
// 配置日志系统
services.AddLogging(logging =>
{
logging.ClearProviders();
logging.AddSerilog();
});
services.AddSingleton<IAdminClientService, AdminClientService>();
services.AddSingleton<IProducerService, ProducerService>();
services.AddSingleton<IConsumerService, ConsumerService>();
// 构建ServiceProvider
_serviceProvider = services.BuildServiceProvider();
// 获取日志记录器工厂
var loggerFactory = _serviceProvider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<Program>();
logger.LogInformation("程序启动");
var adminClientService = _serviceProvider.GetRequiredService<IAdminClientService>();
//await adminClientService.DeleteTopicAsync(topic);
// 创建 topic
adminClientService.CreateTopicAsync(topic, 3, 3).ConfigureAwait(false).GetAwaiter();
_consumerService = _serviceProvider.GetRequiredService<IConsumerService>();
_producerService = _serviceProvider.GetRequiredService<IProducerService>();
}
[Benchmark]
public async Task UseAsync()
{
List<Task> tasks = new();
for (int i = 0; i < N; ++i)
{
var task = _producerService.ProduceAsync<string>(topic, i.ToString());
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
[Benchmark]
public async Task UseLibrd()
{
List<Task> tasks = new();
for (int i = 0; i < N; ++i)
{
var task = _producerService.ProduceAsync<string>(topic, i.ToString(),null);
}
await Task.WhenAll(tasks);
}
}
}