diff --git a/modules/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj b/modules/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj
index db97b00..c312740 100644
--- a/modules/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj
+++ b/modules/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj
@@ -31,10 +31,14 @@
-
+
+
diff --git a/modules/JiShe.CollectBus.Kafka.Test/KafkaProduceBenchmark.cs b/modules/JiShe.CollectBus.Kafka.Test/KafkaProduceBenchmark.cs
index a8b8d93..4152a33 100644
--- a/modules/JiShe.CollectBus.Kafka.Test/KafkaProduceBenchmark.cs
+++ b/modules/JiShe.CollectBus.Kafka.Test/KafkaProduceBenchmark.cs
@@ -1,8 +1,10 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using Confluent.Kafka;
+using JiShe.CollectBus.Common;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Consumer;
+using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using Microsoft.Extensions.Configuration;
@@ -24,11 +26,11 @@ namespace JiShe.CollectBus.Kafka.Test
{
// 每批消息数量
- [Params(1000, 10000, 100000)]
+ [Params(1000, 10000, 100000, 1000000)]
public int N;
public ServiceProvider _serviceProvider;
- public IConsumerService _consumerService;
- public IProducerService _producerService;
+ public IConsumerService _consumerService;
+ public IProducerService _producerService;
public string topic = "test-topic1";
[GlobalSetup]
@@ -40,13 +42,22 @@ namespace JiShe.CollectBus.Kafka.Test
.AddJsonFile("appsettings.json")
.Build();
// 直接读取配置项
- var greeting = config["ServerTagName"];
+ var greeting = config["Kafka:ServerTagName"];
Console.WriteLine(greeting); // 输出: Hello, World!
// 创建服务容器
var services = new ServiceCollection();
// 注册 IConfiguration 实例
services.AddSingleton(config);
+ services.Configure(options =>
+ {
+ config.GetSection("Kafka").Bind(options);
+ });
+ services.Configure(options =>
+ {
+ config.GetSection(nameof(ServerApplicationOptions)).Bind(options);
+ });
+
// 初始化日志
Log.Logger = new LoggerConfiguration()
.ReadFrom.Configuration(config) // 从 appsettings.json 读取配置
@@ -61,6 +72,8 @@ namespace JiShe.CollectBus.Kafka.Test
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
+ services.AddSingleton();
+ services.AddTransient();
// 构建ServiceProvider
_serviceProvider = services.BuildServiceProvider();
@@ -72,10 +85,10 @@ namespace JiShe.CollectBus.Kafka.Test
var adminClientService = _serviceProvider.GetRequiredService();
-
+
//await adminClientService.DeleteTopicAsync(topic);
// 创建 topic
- adminClientService.CreateTopicAsync(topic, 3, 3).ConfigureAwait(false).GetAwaiter();
+ //adminClientService.CreateTopicAsync(topic, 3, 3).ConfigureAwait(false).GetAwaiter();
_consumerService = _serviceProvider.GetRequiredService();
@@ -100,9 +113,9 @@ namespace JiShe.CollectBus.Kafka.Test
List tasks = new();
for (int i = 0; i < N; ++i)
{
- var task = _producerService.ProduceAsync(topic, i.ToString(),null);
+ var task = _producerService.ProduceAsync(topic, i.ToString(), null);
}
await Task.WhenAll(tasks);
}
- }
+ }
}
diff --git a/modules/JiShe.CollectBus.Kafka.Test/Lib/JiShe.CollectBus.Common.dll b/modules/JiShe.CollectBus.Kafka.Test/Lib/JiShe.CollectBus.Common.dll
new file mode 100644
index 0000000..5d9bd8d
Binary files /dev/null and b/modules/JiShe.CollectBus.Kafka.Test/Lib/JiShe.CollectBus.Common.dll differ
diff --git a/modules/JiShe.CollectBus.Kafka.Test/Program.cs b/modules/JiShe.CollectBus.Kafka.Test/Program.cs
index 3c99810..f1f9cfd 100644
--- a/modules/JiShe.CollectBus.Kafka.Test/Program.cs
+++ b/modules/JiShe.CollectBus.Kafka.Test/Program.cs
@@ -1,8 +1,5 @@
// See https://aka.ms/new-console-template for more information
-using BenchmarkDotNet.Configs;
-using BenchmarkDotNet.Running;
-using Confluent.Kafka;
-using DeviceDetectorNET.Parser.Device;
+using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.AdminClient;
@@ -17,10 +14,6 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Serilog;
-using System.Diagnostics;
-using System.Reflection;
-using System.Reflection.PortableExecutable;
-using System.Text.Json;
#region 基准测试
//var summary = BenchmarkRunner.Run();
@@ -58,7 +51,18 @@ var host = Host.CreateDefaultBuilder(args)
logging.ClearProviders();
logging.AddSerilog();
});
- services.Configure(config.GetSection("Kafka"));
+ //services.Configure(config.GetSection("Kafka"));
+ //services.Configure(config.GetSection("ServerApplicationOptions"));
+ var dss = config.GetSection("Kafka");
+
+ services.Configure(options =>
+ {
+ config.GetSection("Kafka").Bind(options);
+ });
+ services.Configure(options =>
+ {
+ config.GetSection(nameof(ServerApplicationOptions)).Bind(options);
+ });
services.AddSingleton();
services.AddSingleton();
@@ -86,8 +90,20 @@ var host = Host.CreateDefaultBuilder(args)
var loggerFactory = host.Services.GetRequiredService();
var logger = loggerFactory.CreateLogger();
logger.LogInformation("程序启动");
+
+
+var _kafkaPollyPipeline = host.Services.GetRequiredService();
+if (_kafkaPollyPipeline == null)
+{
+ logger.LogInformation("KafkaPollyPipeline未注册!");
+}
+
+
var adminClientService = host.Services.GetRequiredService();
var configuration = host.Services.GetRequiredService();
+
+var kafkaOptionConfig=host.Services.GetRequiredService>();
+
string topic = ProtocolConst.TESTTOPIC;
//await adminClientService.DeleteTopicAsync(topic);
// 创建 topic
diff --git a/modules/JiShe.CollectBus.Kafka.Test/appsettings.json b/modules/JiShe.CollectBus.Kafka.Test/appsettings.json
index 9767dee..e024316 100644
--- a/modules/JiShe.CollectBus.Kafka.Test/appsettings.json
+++ b/modules/JiShe.CollectBus.Kafka.Test/appsettings.json
@@ -13,7 +13,8 @@
"DotNetCore.CAP": "Warning",
"Serilog.AspNetCore": "Information",
"Microsoft.EntityFrameworkCore": "Warning",
- "Microsoft.AspNetCore": "Warning"
+ "Microsoft.AspNetCore": "Warning",
+ "Microsoft.AspNetCore.Diagnostics.HealthChecks": "Warning"
}
},
"WriteTo": [
@@ -34,7 +35,7 @@
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
},
"ConnectionStrings": {
- "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
+ "Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.1.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
@@ -43,7 +44,7 @@
"Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"MaxPoolSize": "50",
"DefaultDB": "14",
- "HangfireDB": "15"
+ "HangfireDB": "13"
},
"Jwt": {
"Audience": "JiShe.CollectBus",
@@ -51,16 +52,11 @@
"Issuer": "JiShe.CollectBus",
"ExpirationTime": 2
},
- "HealthCheck": {
+ "HealthChecks": {
"IsEnable": true,
- "MySql": {
- "IsEnable": true
- },
- "Pings": {
- "IsEnable": true,
- "Host": "https://www.baidu.com/",
- "TimeOut": 5000
- }
+ "HealthCheckDatabaseName": "HealthChecks",
+ "EvaluationTimeInSeconds": 10,
+ "MinimumSecondsBetweenFailureNotifications": 60
},
"SwaggerConfig": [
{
@@ -74,14 +70,6 @@
"Version": "V1"
}
],
- "Cap": {
- "RabbitMq": {
- "HostName": "118.190.144.92",
- "UserName": "collectbus",
- "Password": "123456",
- "Port": 5672
- }
- },
"Kafka": {
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
"EnableFilter": true,
@@ -92,48 +80,18 @@
"SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30,
- "ServerTagName": "JiSheCollectBus99"
- //"Topic": {
- // "ReplicationFactor": 3,
- // "NumPartitions": 1000
- //}
+ "ServerTagName": "JiSheCollectBus4",
+ "FirstCollectionTime": "2025-04-22 16:07:00"
},
- //"Kafka": {
- // "Connections": {
- // "Default": {
- // "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092"
- // // "SecurityProtocol": "SASL_PLAINTEXT",
- // // "SaslMechanism": "PLAIN",
- // // "SaslUserName": "lixiao",
- // // "SaslPassword": "lixiao1980",
- // }
- // },
- // "Consumer": {
- // "GroupId": "JiShe.CollectBus"
- // },
- // "Producer": {
- // "MessageTimeoutMs": 6000,
- // "Acks": -1
- // },
- // "Topic": {
- // "ReplicationFactor": 3,
- // "NumPartitions": 1000
- // },
- // "EventBus": {
- // "GroupId": "JiShe.CollectBus",
- // "TopicName": "DefaultTopicName"
- // }
- //},
"IoTDBOptions": {
"UserName": "root",
"Password": "root",
"ClusterList": [ "192.168.1.9:6667" ],
- "PoolSize": 2,
+ "PoolSize": 32,
"DataBaseName": "energy",
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
},
- "ServerTagName": "JiSheCollectBus3",
"Cassandra": {
"ReplicationStrategy": {
"Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下
@@ -156,6 +114,12 @@
"Port": 9043,
"DataCenter": "dc1",
"Rack": "RAC2"
+ },
+ {
+ "Host": "192.168.1.9",
+ "Port": 9044,
+ "DataCenter": "dc1",
+ "Rack": "RAC2"
}
],
"Username": "admin",
@@ -176,5 +140,17 @@
"SerialConsistencyLevel": "Serial",
"DefaultIdempotence": true
}
- }
+ },
+ "ServerApplicationOptions": {
+ "ServerTagName": "JiSheCollectBus4",
+ "SystemType": "Energy",
+ "FirstCollectionTime": "2025-04-28 15:07:00",
+ "AutomaticVerificationTime": "16:07:00",
+ "AutomaticTerminalVersionTime": "17:07:00",
+ "AutomaticTelematicsModuleTime": "17:30:00",
+ "AutomaticDayFreezeTime": "02:30:00",
+ "AutomaticMonthFreezeTime": "03:30:00",
+ "DefaultProtocolPlugin": "T37612012ProtocolPlugin"
+ },
+ "PlugInFolder": ""
}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs
index c467921..9750f86 100644
--- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs
+++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs
@@ -1,16 +1,8 @@
using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
-using Polly;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Polly.Contrib.WaitAndRetry;
-using Volo.Abp.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using JiShe.CollectBus.Kafka.Producer;
namespace JiShe.CollectBus.Kafka.Internal
{
diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
index 3c50aae..a0438db 100644
--- a/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
+++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
@@ -137,13 +137,11 @@ namespace JiShe.CollectBus.Kafka
foreach (var sub in subscribedMethods)
{
- int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
-#if DEBUG
+ int partitionCount = sub.Attribute!.TaskCount==-1?3: sub.Attribute!.TaskCount;// kafkaOptionConfig.NumPartitions;
var adminClientService = provider.GetRequiredService();
int topicCount = adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic);
partitionCount = partitionCount > topicCount ? topicCount : partitionCount;
-#endif
- //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
+ //partitionCount = sub.Attribute!.TaskCount == -1 ? adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
partitionCount = 1;
for (int i = 0; i < partitionCount; i++)
diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs
index 1313fab..7dec01c 100644
--- a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs
+++ b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs
@@ -96,7 +96,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (taskData != null)
{
// 更新
- meter.Timestamps = taskData.Timestamps;
+ meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived=true;
taskData.ReceivedMessageHexString= analysisBaseDto.HexMessage;
taskData.ReceivedMessageId= analysisBaseDto.MessageId ?? string.Empty;
@@ -197,7 +197,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData
if (taskData != null)
{
// 更新
- meter.Timestamps = taskData.Timestamps;
+ meter.Timestamps = taskData.PendingCopyReadTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
taskData.IsReceived = true;
taskData.ReceivedMessageHexString = analysisBaseDto.HexMessage;
taskData.ReceivedMessageId = analysisBaseDto.MessageId ?? string.Empty;