2025-04-18 09:29:38 +08:00
|
|
|
|
// See https://aka.ms/new-console-template for more information
|
2025-04-30 09:50:50 +08:00
|
|
|
|
using JiShe.CollectBus.Common;
|
2025-04-18 09:29:38 +08:00
|
|
|
|
using JiShe.CollectBus.Common.Consts;
|
|
|
|
|
|
using JiShe.CollectBus.Kafka;
|
|
|
|
|
|
using JiShe.CollectBus.Kafka.AdminClient;
|
|
|
|
|
|
using JiShe.CollectBus.Kafka.Consumer;
|
2025-04-19 00:30:58 +08:00
|
|
|
|
using JiShe.CollectBus.Kafka.Internal;
|
2025-04-18 09:29:38 +08:00
|
|
|
|
using JiShe.CollectBus.Kafka.Producer;
|
|
|
|
|
|
using JiShe.CollectBus.Kafka.Test;
|
|
|
|
|
|
using Microsoft.AspNetCore.Builder;
|
|
|
|
|
|
using Microsoft.Extensions.Configuration;
|
|
|
|
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
|
|
|
|
using Microsoft.Extensions.Hosting;
|
|
|
|
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
|
|
using Microsoft.Extensions.Options;
|
|
|
|
|
|
using Serilog;
|
|
|
|
|
|
|
|
|
|
|
|
#region 基准测试
|
|
|
|
|
|
//var summary = BenchmarkRunner.Run<KafkaProduceBenchmark>();
|
|
|
|
|
|
//Console.WriteLine("压测完成");
|
|
|
|
|
|
//return;
|
|
|
|
|
|
#endregion 基准测试
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var host = Host.CreateDefaultBuilder(args)
|
|
|
|
|
|
.ConfigureServices(services =>
|
|
|
|
|
|
{
|
|
|
|
|
|
// 构建配置
|
|
|
|
|
|
var config = new ConfigurationBuilder()
|
|
|
|
|
|
.SetBasePath(Directory.GetCurrentDirectory())
|
|
|
|
|
|
.AddJsonFile("appsettings.json")
|
|
|
|
|
|
.Build();
|
|
|
|
|
|
// 直接读取配置项
|
|
|
|
|
|
var greeting = config["Kafka:ServerTagName"];
|
|
|
|
|
|
Console.WriteLine(greeting); // 输出: Hello, World!
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 创建服务容器
|
|
|
|
|
|
//var services = new ServiceCollection();
|
|
|
|
|
|
// 注册 IConfiguration 实例
|
|
|
|
|
|
services.AddSingleton<IConfiguration>(config);
|
|
|
|
|
|
|
|
|
|
|
|
// 初始化日志
|
|
|
|
|
|
Log.Logger = new LoggerConfiguration()
|
|
|
|
|
|
.ReadFrom.Configuration(config) // 从 appsettings.json 读取配置
|
|
|
|
|
|
.CreateLogger();
|
|
|
|
|
|
|
|
|
|
|
|
// 配置日志系统
|
|
|
|
|
|
services.AddLogging(logging =>
|
|
|
|
|
|
{
|
|
|
|
|
|
logging.ClearProviders();
|
|
|
|
|
|
logging.AddSerilog();
|
|
|
|
|
|
});
|
2025-04-30 09:50:50 +08:00
|
|
|
|
//services.Configure<KafkaOptionConfig>(config.GetSection("Kafka"));
|
|
|
|
|
|
//services.Configure<ServerApplicationOptions>(config.GetSection("ServerApplicationOptions"));
|
|
|
|
|
|
var dss = config.GetSection("Kafka");
|
|
|
|
|
|
|
|
|
|
|
|
services.Configure<KafkaOptionConfig>(options =>
|
|
|
|
|
|
{
|
|
|
|
|
|
config.GetSection("Kafka").Bind(options);
|
|
|
|
|
|
});
|
|
|
|
|
|
services.Configure<ServerApplicationOptions>(options =>
|
|
|
|
|
|
{
|
|
|
|
|
|
config.GetSection(nameof(ServerApplicationOptions)).Bind(options);
|
|
|
|
|
|
});
|
2025-04-18 09:29:38 +08:00
|
|
|
|
|
|
|
|
|
|
services.AddSingleton<IAdminClientService, AdminClientService>();
|
|
|
|
|
|
services.AddSingleton<IProducerService, ProducerService>();
|
|
|
|
|
|
services.AddSingleton<IConsumerService, ConsumerService>();
|
2025-04-23 13:59:15 +08:00
|
|
|
|
services.AddSingleton<KafkaPollyPipeline>();
|
2025-04-18 09:29:38 +08:00
|
|
|
|
services.AddTransient<KafkaSubscribeTest>();
|
|
|
|
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
.ConfigureConsoleAppBuilder(appBuilder =>
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
.Build();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await host.StartAsync();
|
|
|
|
|
|
var appBuilder = host.Services.GetRequiredService<IApplicationBuilder>();
|
|
|
|
|
|
appBuilder.ApplicationServices.UseKafkaSubscribe();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 构建ServiceProvider
|
|
|
|
|
|
//var serviceProvider = services.BuildServiceProvider();
|
|
|
|
|
|
|
|
|
|
|
|
// 获取日志记录器工厂
|
|
|
|
|
|
var loggerFactory = host.Services.GetRequiredService<ILoggerFactory>();
|
|
|
|
|
|
var logger = loggerFactory.CreateLogger<Program>();
|
|
|
|
|
|
logger.LogInformation("程序启动");
|
2025-04-30 09:50:50 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var _kafkaPollyPipeline = host.Services.GetRequiredService<KafkaPollyPipeline>();
|
|
|
|
|
|
if (_kafkaPollyPipeline == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
logger.LogInformation("KafkaPollyPipeline未注册!");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-04-18 09:29:38 +08:00
|
|
|
|
var adminClientService = host.Services.GetRequiredService<IAdminClientService>();
|
|
|
|
|
|
var configuration = host.Services.GetRequiredService<IConfiguration>();
|
2025-04-30 09:50:50 +08:00
|
|
|
|
|
|
|
|
|
|
var kafkaOptionConfig=host.Services.GetRequiredService<IOptions<ServerApplicationOptions>>();
|
|
|
|
|
|
|
2025-04-19 00:30:58 +08:00
|
|
|
|
string topic = ProtocolConst.TESTTOPIC;
|
2025-04-18 09:29:38 +08:00
|
|
|
|
//await adminClientService.DeleteTopicAsync(topic);
|
|
|
|
|
|
// 创建 topic
|
|
|
|
|
|
//await adminClientService.CreateTopicAsync(topic, configuration.GetValue<int>(CommonConst.NumPartitions), 3);
|
|
|
|
|
|
|
|
|
|
|
|
var consumerService = host.Services.GetRequiredService<IConsumerService>();
|
2025-04-19 00:30:58 +08:00
|
|
|
|
var producerService = host.Services.GetRequiredService<IProducerService>();
|
2025-04-18 09:29:38 +08:00
|
|
|
|
//var kafkaOptions = host.Services.GetRequiredService<IOptions<KafkaOptionConfig>>();
|
|
|
|
|
|
//await consumerService.SubscribeAsync<object>(topic, (message) =>
|
|
|
|
|
|
//{
|
|
|
|
|
|
// try
|
|
|
|
|
|
// {
|
|
|
|
|
|
// logger.LogInformation($"消费消息:{message}");
|
|
|
|
|
|
// return Task.FromResult(true);
|
|
|
|
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
// catch (ConsumeException ex)
|
|
|
|
|
|
// {
|
|
|
|
|
|
// // 处理消费错误
|
|
|
|
|
|
// logger.LogError($"kafka消费异常:{ex.Message}");
|
|
|
|
|
|
// }
|
|
|
|
|
|
// return Task.FromResult(false);
|
|
|
|
|
|
//}, "default");
|
|
|
|
|
|
|
|
|
|
|
|
//Stopwatch stopwatch = Stopwatch.StartNew();
|
|
|
|
|
|
|
|
|
|
|
|
//for (int i = 0; i < 3; i++)
|
|
|
|
|
|
//{
|
2025-04-19 00:30:58 +08:00
|
|
|
|
//await consumerService.SubscribeBatchAsync<dynamic>(topic, (message) =>
|
|
|
|
|
|
//{
|
|
|
|
|
|
// try
|
2025-04-18 09:29:38 +08:00
|
|
|
|
// {
|
2025-04-19 00:30:58 +08:00
|
|
|
|
// int index = 0;
|
|
|
|
|
|
// logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}:{JsonSerializer.Serialize(message)}");
|
|
|
|
|
|
// return Task.FromResult(true);
|
|
|
|
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
// catch (ConsumeException ex)
|
|
|
|
|
|
// {
|
|
|
|
|
|
// // 处理消费错误
|
|
|
|
|
|
// logger.LogError($"kafka消费异常:{ex.Message}");
|
|
|
|
|
|
// }
|
|
|
|
|
|
// return Task.FromResult(false);
|
|
|
|
|
|
//});
|
2025-04-18 09:29:38 +08:00
|
|
|
|
//}
|
|
|
|
|
|
//stopwatch.Stop();
|
|
|
|
|
|
//Console.WriteLine($"耗时: {stopwatch.ElapsedMilliseconds} 毫秒,{stopwatch.ElapsedMilliseconds/1000} 秒");
|
|
|
|
|
|
|
2025-04-19 00:30:58 +08:00
|
|
|
|
|
|
|
|
|
|
int num = 1;
|
|
|
|
|
|
while (num <= 6)
|
|
|
|
|
|
{
|
|
|
|
|
|
await producerService.ProduceAsync<TestTopic>(topic, new TestTopic { Topic = topic, Val = num });
|
|
|
|
|
|
num++;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//int num = 2;
|
|
|
|
|
|
//while (num <= 4)
|
2025-04-18 09:29:38 +08:00
|
|
|
|
//{
|
|
|
|
|
|
// await producerService.ProduceAsync<string>(topic, num.ToString());
|
|
|
|
|
|
// num++;
|
|
|
|
|
|
//}
|
2025-04-19 00:30:58 +08:00
|
|
|
|
//await Task.Factory.StartNew(async() => {
|
|
|
|
|
|
// int num = 0;
|
|
|
|
|
|
// while (true)
|
|
|
|
|
|
// {
|
|
|
|
|
|
// //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i });
|
|
|
|
|
|
// await producerService.ProduceAsync<string>(topic, num.ToString());
|
|
|
|
|
|
// num++;
|
|
|
|
|
|
// }
|
|
|
|
|
|
//});
|
2025-04-18 09:29:38 +08:00
|
|
|
|
Console.WriteLine("\n按Esc键退出");
|
|
|
|
|
|
while (true)
|
|
|
|
|
|
{
|
|
|
|
|
|
var key = Console.ReadKey(intercept: true); // intercept:true 隐藏按键显示
|
|
|
|
|
|
|
|
|
|
|
|
if (key.Key == ConsoleKey.Escape)
|
|
|
|
|
|
{
|
|
|
|
|
|
await host.StopAsync();
|
|
|
|
|
|
Console.WriteLine("\n程序已退出");
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
(host.Services as IDisposable)?.Dispose();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public class TestTopic
|
|
|
|
|
|
{
|
|
|
|
|
|
public string Topic { get; set; }
|
|
|
|
|
|
public int Val { get; set; }
|
|
|
|
|
|
}
|