// See https://aka.ms/new-console-template for more information using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Test; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Serilog; #region 基准测试 //var summary = BenchmarkRunner.Run(); //Console.WriteLine("压测完成"); //return; #endregion 基准测试 var host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // 构建配置 var config = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("appsettings.json") .Build(); // 直接读取配置项 var greeting = config["Kafka:ServerTagName"]; Console.WriteLine(greeting); // 输出: Hello, World! // 创建服务容器 //var services = new ServiceCollection(); // 注册 IConfiguration 实例 services.AddSingleton(config); // 初始化日志 Log.Logger = new LoggerConfiguration() .ReadFrom.Configuration(config) // 从 appsettings.json 读取配置 .CreateLogger(); // 配置日志系统 services.AddLogging(logging => { logging.ClearProviders(); logging.AddSerilog(); }); //services.Configure(config.GetSection("Kafka")); //services.Configure(config.GetSection("ServerApplicationOptions")); var dss = config.GetSection("Kafka"); services.Configure(options => { config.GetSection("Kafka").Bind(options); }); services.Configure(options => { config.GetSection(nameof(ServerApplicationOptions)).Bind(options); }); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddTransient(); }) .ConfigureConsoleAppBuilder(appBuilder => { }) .Build(); await host.StartAsync(); var appBuilder = host.Services.GetRequiredService(); appBuilder.ApplicationServices.UseKafkaSubscribe(); // 构建ServiceProvider //var serviceProvider = services.BuildServiceProvider(); // 获取日志记录器工厂 var loggerFactory = host.Services.GetRequiredService(); var logger = loggerFactory.CreateLogger(); logger.LogInformation("程序启动"); var _kafkaPollyPipeline = host.Services.GetRequiredService(); if (_kafkaPollyPipeline == null) { logger.LogInformation("KafkaPollyPipeline未注册!"); } var adminClientService = host.Services.GetRequiredService(); var configuration = host.Services.GetRequiredService(); var kafkaOptionConfig=host.Services.GetRequiredService>(); string topic = ProtocolConst.TESTTOPIC; //await adminClientService.DeleteTopicAsync(topic); // 创建 topic //await adminClientService.CreateTopicAsync(topic, configuration.GetValue(CommonConst.NumPartitions), 3); var consumerService = host.Services.GetRequiredService(); var producerService = host.Services.GetRequiredService(); //var kafkaOptions = host.Services.GetRequiredService>(); //await consumerService.SubscribeAsync(topic, (message) => //{ // try // { // logger.LogInformation($"消费消息:{message}"); // return Task.FromResult(true); // } // catch (ConsumeException ex) // { // // 处理消费错误 // logger.LogError($"kafka消费异常:{ex.Message}"); // } // return Task.FromResult(false); //}, "default"); //Stopwatch stopwatch = Stopwatch.StartNew(); //for (int i = 0; i < 3; i++) //{ //await consumerService.SubscribeBatchAsync(topic, (message) => //{ // try // { // int index = 0; // logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}:{JsonSerializer.Serialize(message)}"); // return Task.FromResult(true); // } // catch (ConsumeException ex) // { // // 处理消费错误 // logger.LogError($"kafka消费异常:{ex.Message}"); // } // return Task.FromResult(false); //}); //} //stopwatch.Stop(); //Console.WriteLine($"耗时: {stopwatch.ElapsedMilliseconds} 毫秒,{stopwatch.ElapsedMilliseconds/1000} 秒"); int num = 1; while (num <= 6) { await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = num }); num++; } //int num = 2; //while (num <= 4) //{ // await producerService.ProduceAsync(topic, num.ToString()); // num++; //} //await Task.Factory.StartNew(async() => { // int num = 0; // while (true) // { // //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); // await producerService.ProduceAsync(topic, num.ToString()); // num++; // } //}); Console.WriteLine("\n按Esc键退出"); while (true) { var key = Console.ReadKey(intercept: true); // intercept:true 隐藏按键显示 if (key.Key == ConsoleKey.Escape) { await host.StopAsync(); Console.WriteLine("\n程序已退出"); break; } } (host.Services as IDisposable)?.Dispose(); public class TestTopic { public string Topic { get; set; } public int Val { get; set; } }