using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Jobs; using Confluent.Kafka; using JiShe.CollectBus.Common; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Serilog; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace JiShe.CollectBus.Kafka.Test { [SimpleJob(RuntimeMoniker.Net80)] //[SimpleJob(RuntimeMoniker.NativeAot80)] [RPlotExporter] public class KafkaProduceBenchmark { // 每批消息数量 [Params(1000, 10000, 100000, 1000000)] public int N; public ServiceProvider _serviceProvider; public IConsumerService _consumerService; public IProducerService _producerService; public string topic = "test-topic1"; [GlobalSetup] public void Setup() { // 构建配置 var config = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("appsettings.json") .Build(); // 直接读取配置项 var greeting = config["Kafka:ServerTagName"]; Console.WriteLine(greeting); // 输出: Hello, World! // 创建服务容器 var services = new ServiceCollection(); // 注册 IConfiguration 实例 services.AddSingleton(config); services.Configure(options => { config.GetSection("Kafka").Bind(options); }); services.Configure(options => { config.GetSection(nameof(ServerApplicationOptions)).Bind(options); }); // 初始化日志 Log.Logger = new LoggerConfiguration() .ReadFrom.Configuration(config) // 从 appsettings.json 读取配置 .CreateLogger(); // 配置日志系统 services.AddLogging(logging => { logging.ClearProviders(); logging.AddSerilog(); }); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddTransient(); // 构建ServiceProvider _serviceProvider = services.BuildServiceProvider(); // 获取日志记录器工厂 var loggerFactory = _serviceProvider.GetRequiredService(); var logger = loggerFactory.CreateLogger(); logger.LogInformation("程序启动"); var adminClientService = _serviceProvider.GetRequiredService(); //await adminClientService.DeleteTopicAsync(topic); // 创建 topic //adminClientService.CreateTopicAsync(topic, 3, 3).ConfigureAwait(false).GetAwaiter(); _consumerService = _serviceProvider.GetRequiredService(); _producerService = _serviceProvider.GetRequiredService(); } [Benchmark] public async Task UseAsync() { List tasks = new(); for (int i = 0; i < N; ++i) { var task = _producerService.ProduceAsync(topic, i.ToString()); tasks.Add(task); } await Task.WhenAll(tasks); } [Benchmark] public async Task UseLibrd() { List tasks = new(); for (int i = 0; i < N; ++i) { var task = _producerService.ProduceAsync(topic, i.ToString(), null); } await Task.WhenAll(tasks); } } }