197 lines
6.0 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// See https://aka.ms/new-console-template for more information
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Kafka.Test;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Serilog;
#region
//var summary = BenchmarkRunner.Run<KafkaProduceBenchmark>();
//Console.WriteLine("压测完成");
//return;
#endregion
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
// 构建配置
var config = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json")
.Build();
// 直接读取配置项
var greeting = config["ServerApplicationOptions:ServerTagName"];
Console.WriteLine(greeting); // 输出: Hello, World!
// 创建服务容器
//var services = new ServiceCollection();
// 注册 IConfiguration 实例
services.AddSingleton<IConfiguration>(config);
// 初始化日志
Log.Logger = new LoggerConfiguration()
.ReadFrom.Configuration(config) // 从 appsettings.json 读取配置
.CreateLogger();
// 配置日志系统
services.AddLogging(logging =>
{
logging.ClearProviders();
logging.AddSerilog();
});
//services.Configure<KafkaOptionConfig>(config.GetSection("Kafka"));
//services.Configure<ServerApplicationOptions>(config.GetSection("ServerApplicationOptions"));
var dss = config.GetSection("Kafka");
services.Configure<KafkaOptionConfig>(options =>
{
config.GetSection("Kafka").Bind(options);
});
services.Configure<ServerApplicationOptions>(options =>
{
config.GetSection(nameof(ServerApplicationOptions)).Bind(options);
});
services.AddSingleton<IAdminClientService, AdminClientService>();
services.AddSingleton<IProducerService, ProducerService>();
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddSingleton<KafkaPollyPipeline>();
services.AddTransient<KafkaSubscribeTest>();
})
.ConfigureConsoleAppBuilder(appBuilder =>
{
})
.Build();
await host.StartAsync();
var appBuilder = host.Services.GetRequiredService<IApplicationBuilder>();
appBuilder.ApplicationServices.UseKafkaSubscribe();
// 构建ServiceProvider
//var serviceProvider = services.BuildServiceProvider();
// 获取日志记录器工厂
var loggerFactory = host.Services.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<Program>();
logger.LogInformation("程序启动");
var _kafkaPollyPipeline = host.Services.GetRequiredService<KafkaPollyPipeline>();
if (_kafkaPollyPipeline == null)
{
logger.LogInformation("KafkaPollyPipeline未注册");
}
var adminClientService = host.Services.GetRequiredService<IAdminClientService>();
var configuration = host.Services.GetRequiredService<IConfiguration>();
var kafkaOptionConfig=host.Services.GetRequiredService<IOptions<ServerApplicationOptions>>();
string topic = ProtocolConst.TESTTOPIC;
//await adminClientService.DeleteTopicAsync(topic);
// 创建 topic
//await adminClientService.CreateTopicAsync(topic, configuration.GetValue<int>(CommonConst.NumPartitions), 3);
var consumerService = host.Services.GetRequiredService<IConsumerService>();
var producerService = host.Services.GetRequiredService<IProducerService>();
//var kafkaOptions = host.Services.GetRequiredService<IOptions<KafkaOptionConfig>>();
//await consumerService.SubscribeAsync<object>(topic, (message) =>
//{
// try
// {
// logger.LogInformation($"消费消息:{message}");
// return Task.FromResult(true);
// }
// catch (ConsumeException ex)
// {
// // 处理消费错误
// logger.LogError($"kafka消费异常:{ex.Message}");
// }
// return Task.FromResult(false);
//}, "default");
//Stopwatch stopwatch = Stopwatch.StartNew();
//for (int i = 0; i < 3; i++)
//{
//await consumerService.SubscribeBatchAsync<dynamic>(topic, (message) =>
//{
// try
// {
// int index = 0;
// logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}{JsonSerializer.Serialize(message)}");
// return Task.FromResult(true);
// }
// catch (ConsumeException ex)
// {
// // 处理消费错误
// logger.LogError($"kafka消费异常:{ex.Message}");
// }
// return Task.FromResult(false);
//});
//}
//stopwatch.Stop();
//Console.WriteLine($"耗时: {stopwatch.ElapsedMilliseconds} 毫秒,{stopwatch.ElapsedMilliseconds/1000} 秒");
int num = 1;
while (num <= 6)
{
await producerService.ProduceAsync<TestTopic>(topic, new TestTopic { Topic = topic, Val = num });
num++;
}
//int num = 2;
//while (num <= 4)
//{
// await producerService.ProduceAsync<string>(topic, num.ToString());
// num++;
//}
//await Task.Factory.StartNew(async() => {
// int num = 0;
// while (true)
// {
// //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i });
// await producerService.ProduceAsync<string>(topic, num.ToString());
// num++;
// }
//});
Console.WriteLine("\n按Esc键退出");
while (true)
{
var key = Console.ReadKey(intercept: true); // intercept:true 隐藏按键显示
if (key.Key == ConsoleKey.Escape)
{
await host.StopAsync();
Console.WriteLine("\n程序已退出");
break;
}
}
(host.Services as IDisposable)?.Dispose();
public class TestTopic
{
public string Topic { get; set; }
public int Val { get; set; }
}