diff --git a/modules/JiShe.CollectBus.Kafka.Test/ConsoleApplicationBuilder.cs b/modules/JiShe.CollectBus.Kafka.Test/ConsoleApplicationBuilder.cs new file mode 100644 index 0000000..f6b0891 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka.Test/ConsoleApplicationBuilder.cs @@ -0,0 +1,72 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http.Features; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Test +{ + public class ConsoleApplicationBuilder: IApplicationBuilder + { + public IServiceProvider ApplicationServices { get; set; } + public IDictionary Properties { get; set; } = new Dictionary(); + + public IFeatureCollection ServerFeatures => throw new NotImplementedException(); + + private readonly List> _middlewares = new(); + + public IApplicationBuilder Use(Func middleware) + { + _middlewares.Add(middleware); + return this; + } + + public RequestDelegate Build() + { + RequestDelegate app = context => Task.CompletedTask; + foreach (var middleware in _middlewares) + { + app = middleware(app); + } + return app; + } + + public IApplicationBuilder New() + { + return new ConsoleApplicationBuilder + { + ApplicationServices = this.ApplicationServices, + Properties = new Dictionary(this.Properties) + }; + } + } + + + public static class HostBuilderExtensions + { + public static IHostBuilder ConfigureConsoleAppBuilder( + this IHostBuilder hostBuilder, + Action configure) + { + hostBuilder.ConfigureServices((context, services) => + { + // 注册 ConsoleApplicationBuilder 到 DI 容器 + services.AddSingleton(provider => + { + var appBuilder = new ConsoleApplicationBuilder + { + ApplicationServices = provider // 注入服务提供者 + }; + configure(appBuilder); // 执行配置委托 + return appBuilder; + }); + }); + return hostBuilder; + } + } +} diff --git a/modules/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj b/modules/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj new file mode 100644 index 0000000..db97b00 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj @@ -0,0 +1,40 @@ + + + + Exe + net8.0 + enable + enable + + + + + Always + true + PreserveNewest + + + + + + + + + + + + + + + + + + + + + + diff --git a/modules/JiShe.CollectBus.Kafka.Test/KafkaProduceBenchmark.cs b/modules/JiShe.CollectBus.Kafka.Test/KafkaProduceBenchmark.cs new file mode 100644 index 0000000..a8b8d93 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka.Test/KafkaProduceBenchmark.cs @@ -0,0 +1,108 @@ +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Jobs; +using Confluent.Kafka; +using JiShe.CollectBus.Kafka.AdminClient; +using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Producer; + +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Serilog; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka.Test +{ + [SimpleJob(RuntimeMoniker.Net80)] + //[SimpleJob(RuntimeMoniker.NativeAot80)] + [RPlotExporter] + public class KafkaProduceBenchmark + { + + // 每批消息数量 + [Params(1000, 10000, 100000)] + public int N; + public ServiceProvider _serviceProvider; + public IConsumerService _consumerService; + public IProducerService _producerService; + public string topic = "test-topic1"; + + [GlobalSetup] + public void Setup() + { + // 构建配置 + var config = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json") + .Build(); + // 直接读取配置项 + var greeting = config["ServerTagName"]; + Console.WriteLine(greeting); // 输出: Hello, World! + // 创建服务容器 + var services = new ServiceCollection(); + // 注册 IConfiguration 实例 + services.AddSingleton(config); + + // 初始化日志 + Log.Logger = new LoggerConfiguration() + .ReadFrom.Configuration(config) // 从 appsettings.json 读取配置 + .CreateLogger(); + + // 配置日志系统 + services.AddLogging(logging => + { + logging.ClearProviders(); + logging.AddSerilog(); + }); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + // 构建ServiceProvider + _serviceProvider = services.BuildServiceProvider(); + + // 获取日志记录器工厂 + var loggerFactory = _serviceProvider.GetRequiredService(); + var logger = loggerFactory.CreateLogger(); + logger.LogInformation("程序启动"); + + var adminClientService = _serviceProvider.GetRequiredService(); + + + //await adminClientService.DeleteTopicAsync(topic); + // 创建 topic + adminClientService.CreateTopicAsync(topic, 3, 3).ConfigureAwait(false).GetAwaiter(); + + _consumerService = _serviceProvider.GetRequiredService(); + + _producerService = _serviceProvider.GetRequiredService(); + } + + [Benchmark] + public async Task UseAsync() + { + List tasks = new(); + for (int i = 0; i < N; ++i) + { + var task = _producerService.ProduceAsync(topic, i.ToString()); + tasks.Add(task); + } + await Task.WhenAll(tasks); + } + + [Benchmark] + public async Task UseLibrd() + { + List tasks = new(); + for (int i = 0; i < N; ++i) + { + var task = _producerService.ProduceAsync(topic, i.ToString(),null); + } + await Task.WhenAll(tasks); + } + } +} diff --git a/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs b/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs new file mode 100644 index 0000000..4c06e22 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka.Test/KafkaSubscribeTest.cs @@ -0,0 +1,68 @@ +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IotSystems.MessageReceiveds; +using JiShe.CollectBus.Kafka.Attributes; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using Volo.Abp.Timing; + +namespace JiShe.CollectBus.Kafka.Test +{ + public class KafkaSubscribeTest: IKafkaSubscribe + { + [KafkaSubscribe(ProtocolConst.TESTTOPIC, EnableBatch=false,BatchSize=1000)] + + public async Task KafkaSubscribeAsync(object obj) + { + Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(obj)}"); + return SubscribeAck.Success(); + } + + + [KafkaSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] + public async Task LoginIssuedEvent(IssuedEventMessage issuedEventMessage) + { + Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); + return SubscribeAck.Success(); + } + + [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberHeartbeatIssuedEventName)] + public async Task HeartbeatIssuedEvent(IssuedEventMessage issuedEventMessage) + { + Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(issuedEventMessage)}"); + return SubscribeAck.Success(); + } + + [KafkaSubscribe(ProtocolConst.SubscriberReceivedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] + public async Task ReceivedEvent(MessageReceived receivedMessage) + { + Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedMessage)}"); + return SubscribeAck.Success(); + } + + [KafkaSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberHeartbeatReceivedEventName)] + public async Task ReceivedHeartbeatEvent(MessageReceivedHeartbeat receivedHeartbeatMessage) + { + Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedHeartbeatMessage)}"); + return SubscribeAck.Success(); + } + + [KafkaSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + //[CapSubscribe(ProtocolConst.SubscriberLoginReceivedEventName)] + public async Task ReceivedLoginEvent(MessageReceivedLogin receivedLoginMessage) + { + Console.WriteLine($"收到订阅消息: {JsonSerializer.Serialize(receivedLoginMessage)}"); + return SubscribeAck.Success(); + } + } +} diff --git a/modules/JiShe.CollectBus.Kafka.Test/Lib/JiShe.CollectBus.Kafka.dll b/modules/JiShe.CollectBus.Kafka.Test/Lib/JiShe.CollectBus.Kafka.dll new file mode 100644 index 0000000..7ca63d6 Binary files /dev/null and b/modules/JiShe.CollectBus.Kafka.Test/Lib/JiShe.CollectBus.Kafka.dll differ diff --git a/modules/JiShe.CollectBus.Kafka.Test/Program.cs b/modules/JiShe.CollectBus.Kafka.Test/Program.cs new file mode 100644 index 0000000..a359e14 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka.Test/Program.cs @@ -0,0 +1,172 @@ +// See https://aka.ms/new-console-template for more information +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Running; +using Confluent.Kafka; +using DeviceDetectorNET.Parser.Device; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Kafka.AdminClient; +using JiShe.CollectBus.Kafka.Consumer; +using JiShe.CollectBus.Kafka.Producer; +using JiShe.CollectBus.Kafka.Test; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Serilog; +using System.Diagnostics; +using System.Reflection; +using System.Reflection.PortableExecutable; +using System.Text.Json; + +#region 基准测试 +//var summary = BenchmarkRunner.Run(); +//Console.WriteLine("压测完成"); +//return; +#endregion 基准测试 + + +var host = Host.CreateDefaultBuilder(args) + .ConfigureServices(services => + { + // 构建配置 + var config = new ConfigurationBuilder() + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json") + .Build(); + // 直接读取配置项 + var greeting = config["Kafka:ServerTagName"]; + Console.WriteLine(greeting); // 输出: Hello, World! + + + // 创建服务容器 + //var services = new ServiceCollection(); + // 注册 IConfiguration 实例 + services.AddSingleton(config); + + // 初始化日志 + Log.Logger = new LoggerConfiguration() + .ReadFrom.Configuration(config) // 从 appsettings.json 读取配置 + .CreateLogger(); + + // 配置日志系统 + services.AddLogging(logging => + { + logging.ClearProviders(); + logging.AddSerilog(); + }); + services.Configure(config.GetSection("Kafka")); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddTransient(); + + }) + .ConfigureConsoleAppBuilder(appBuilder => + { + + }) + .Build(); + + + await host.StartAsync(); + var appBuilder = host.Services.GetRequiredService(); + appBuilder.ApplicationServices.UseKafkaSubscribe(); + + +// 构建ServiceProvider +//var serviceProvider = services.BuildServiceProvider(); + +// 获取日志记录器工厂 +var loggerFactory = host.Services.GetRequiredService(); +var logger = loggerFactory.CreateLogger(); +logger.LogInformation("程序启动"); +var adminClientService = host.Services.GetRequiredService(); +var configuration = host.Services.GetRequiredService(); +string topic = "test-topic"; +//await adminClientService.DeleteTopicAsync(topic); +// 创建 topic +//await adminClientService.CreateTopicAsync(topic, configuration.GetValue(CommonConst.NumPartitions), 3); + +var consumerService = host.Services.GetRequiredService(); +//var kafkaOptions = host.Services.GetRequiredService>(); +//await consumerService.SubscribeAsync(topic, (message) => +//{ +// try +// { +// logger.LogInformation($"消费消息:{message}"); +// return Task.FromResult(true); + +// } +// catch (ConsumeException ex) +// { +// // 处理消费错误 +// logger.LogError($"kafka消费异常:{ex.Message}"); +// } +// return Task.FromResult(false); +//}, "default"); + +//Stopwatch stopwatch = Stopwatch.StartNew(); + +//for (int i = 0; i < 3; i++) +//{ +// await consumerService.SubscribeBatchAsync(topic, (message) => +// { +// try +// { +// int index = 0; +// logger.LogInformation($"消费消息_{index}消费总数:{message.Count()}:{JsonSerializer.Serialize(message)}"); +// return Task.FromResult(true); + +// } +// catch (ConsumeException ex) +// { +// // 处理消费错误 +// logger.LogError($"kafka消费异常:{ex.Message}"); +// } +// return Task.FromResult(false); +// }); +//} +//stopwatch.Stop(); +//Console.WriteLine($"耗时: {stopwatch.ElapsedMilliseconds} 毫秒,{stopwatch.ElapsedMilliseconds/1000} 秒"); + +var producerService = host.Services.GetRequiredService(); +//int num = 840; +//while (num <= 900) +//{ +// //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); +// await producerService.ProduceAsync(topic, num.ToString()); +// num++; +//} +await Task.Factory.StartNew(async() => { + int num = 0; + while (true) + { + //await producerService.ProduceAsync(topic, new TestTopic { Topic = topic, Val = i }); + await producerService.ProduceAsync(topic, num.ToString()); + num++; + } +}); +Console.WriteLine("\n按Esc键退出"); +while (true) +{ + var key = Console.ReadKey(intercept: true); // intercept:true 隐藏按键显示 + + if (key.Key == ConsoleKey.Escape) + { + await host.StopAsync(); + Console.WriteLine("\n程序已退出"); + break; + } +} +(host.Services as IDisposable)?.Dispose(); + + +public class TestTopic +{ + public string Topic { get; set; } + public int Val { get; set; } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka.Test/appsettings.json b/modules/JiShe.CollectBus.Kafka.Test/appsettings.json new file mode 100644 index 0000000..b2579c6 --- /dev/null +++ b/modules/JiShe.CollectBus.Kafka.Test/appsettings.json @@ -0,0 +1,180 @@ +{ + "Serilog": { + "Using": [ + "Serilog.Sinks.Console", + "Serilog.Sinks.File" + ], + "MinimumLevel": { + "Default": "Information", + "Override": { + "Microsoft": "Warning", + "Volo.Abp": "Warning", + "Hangfire": "Warning", + "DotNetCore.CAP": "Warning", + "Serilog.AspNetCore": "Information", + "Microsoft.EntityFrameworkCore": "Warning", + "Microsoft.AspNetCore": "Warning" + } + }, + "WriteTo": [ + { + "Name": "Console" + }, + { + "Name": "File", + "Args": { + "path": "logs/logs-.txt", + "rollingInterval": "Day" + } + } + ] + }, + "App": { + "SelfUrl": "http://localhost:44315", + "CorsOrigins": "http://localhost:4200,http://localhost:3100" + }, + "ConnectionStrings": { + "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", + "Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", + "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", + "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" + }, + "Redis": { + "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", + "MaxPoolSize": "50", + "DefaultDB": "14", + "HangfireDB": "15" + }, + "Jwt": { + "Audience": "JiShe.CollectBus", + "SecurityKey": "dzehzRz9a8asdfasfdadfasdfasdfafsdadfasbasdf=", + "Issuer": "JiShe.CollectBus", + "ExpirationTime": 2 + }, + "HealthCheck": { + "IsEnable": true, + "MySql": { + "IsEnable": true + }, + "Pings": { + "IsEnable": true, + "Host": "https://www.baidu.com/", + "TimeOut": 5000 + } + }, + "SwaggerConfig": [ + { + "GroupName": "Basic", + "Title": "【后台管理】基础模块", + "Version": "V1" + }, + { + "GroupName": "Business", + "Title": "【后台管理】业务模块", + "Version": "V1" + } + ], + "Cap": { + "RabbitMq": { + "HostName": "118.190.144.92", + "UserName": "collectbus", + "Password": "123456", + "Port": 5672 + } + }, + "Kafka": { + "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", + "EnableFilter": true, + "EnableAuthorization": false, + "SecurityProtocol": "SaslPlaintext", + "SaslMechanism": "Plain", + "SaslUserName": "lixiao", + "SaslPassword": "lixiao1980", + "KafkaReplicationFactor": 3, + "NumPartitions": 1, + "ServerTagName": "JiSheCollectBus2" + //"Topic": { + // "ReplicationFactor": 3, + // "NumPartitions": 1000 + //} + }, + //"Kafka": { + // "Connections": { + // "Default": { + // "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092" + // // "SecurityProtocol": "SASL_PLAINTEXT", + // // "SaslMechanism": "PLAIN", + // // "SaslUserName": "lixiao", + // // "SaslPassword": "lixiao1980", + // } + // }, + // "Consumer": { + // "GroupId": "JiShe.CollectBus" + // }, + // "Producer": { + // "MessageTimeoutMs": 6000, + // "Acks": -1 + // }, + // "Topic": { + // "ReplicationFactor": 3, + // "NumPartitions": 1000 + // }, + // "EventBus": { + // "GroupId": "JiShe.CollectBus", + // "TopicName": "DefaultTopicName" + // } + //}, + "IoTDBOptions": { + "UserName": "root", + "Password": "root", + "ClusterList": [ "192.168.1.9:6667" ], + "PoolSize": 2, + "DataBaseName": "energy", + "OpenDebugMode": true, + "UseTableSessionPoolByDefault": false + }, + "ServerTagName": "JiSheCollectBus3", + "Cassandra": { + "ReplicationStrategy": { + "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下 + "DataCenters": [ + { + "Name": "dc1", + "ReplicationFactor": 3 + } + ] + }, + "Nodes": [ + { + "Host": "192.168.1.9", + "Port": 9042, + "DataCenter": "dc1", + "Rack": "RAC1" + }, + { + "Host": "192.168.1.9", + "Port": 9043, + "DataCenter": "dc1", + "Rack": "RAC2" + } + ], + "Username": "admin", + "Password": "lixiao1980", + "Keyspace": "jishecollectbus", + "ConsistencyLevel": "Quorum", + "PoolingOptions": { + "CoreConnectionsPerHost": 4, + "MaxConnectionsPerHost": 8, + "MaxRequestsPerConnection": 2000 + }, + "SocketOptions": { + "ConnectTimeoutMillis": 10000, + "ReadTimeoutMillis": 20000 + }, + "QueryOptions": { + "ConsistencyLevel": "Quorum", + "SerialConsistencyLevel": "Serial", + "DefaultIdempotence": true + } + } +} \ No newline at end of file