diff --git a/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj b/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj
new file mode 100644
index 0000000..c7e7af1
--- /dev/null
+++ b/JiShe.CollectBus.Kafka.Test/JiShe.CollectBus.Kafka.Test.csproj
@@ -0,0 +1,31 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+
+
+
+
+ Always
+ true
+ PreserveNewest
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/JiShe.CollectBus.Kafka.Test/Program.cs b/JiShe.CollectBus.Kafka.Test/Program.cs
new file mode 100644
index 0000000..36abdb6
--- /dev/null
+++ b/JiShe.CollectBus.Kafka.Test/Program.cs
@@ -0,0 +1,79 @@
+// See https://aka.ms/new-console-template for more information
+using Confluent.Kafka;
+using JiShe.CollectBus.Kafka.AdminClient;
+using JiShe.CollectBus.Kafka.Consumer;
+using JiShe.CollectBus.Kafka.Producer;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Serilog;
+using System.Text.Json;
+using static Confluent.Kafka.ConfigPropertyNames;
+
+
+
+// 构建配置
+var config = new ConfigurationBuilder()
+ .SetBasePath(Directory.GetCurrentDirectory())
+ .AddJsonFile("appsettings.json")
+ .Build();
+// 直接读取配置项
+var greeting = config["ServerTagName"];
+Console.WriteLine(greeting); // 输出: Hello, World!
+
+
+// 创建服务容器
+var services = new ServiceCollection();
+// 注册 IConfiguration 实例
+services.AddSingleton(config);
+
+// 初始化日志
+Log.Logger = new LoggerConfiguration()
+ .ReadFrom.Configuration(config) // 从 appsettings.json 读取配置
+ .CreateLogger();
+
+// 配置日志系统
+services.AddLogging(logging =>
+{
+ logging.ClearProviders();
+ logging.AddSerilog();
+});
+services.AddSingleton();
+services.AddTransient(typeof(IProducerService<,>), typeof(ProducerService<,>));
+//services.AddSingleton(typeof(IConsumerService<,>), typeof(ConsumerService<,>));
+
+// 构建ServiceProvider
+var serviceProvider = services.BuildServiceProvider();
+
+// 获取日志记录器工厂
+var loggerFactory = serviceProvider.GetRequiredService();
+var logger = loggerFactory.CreateLogger();
+logger.LogInformation("程序启动");
+
+var adminClientService = serviceProvider.GetRequiredService();
+
+string topic = "test-topic";
+//await adminClientService.DeleteTopicAsync(topic);
+// 创建 topic
+await adminClientService.CreateTopicAsync(topic, 3, 3);
+
+
+var producerService = serviceProvider.GetRequiredService>();
+int i = 1;
+while (i <= 10)
+{
+ await producerService.ProduceAsync(topic, JsonSerializer.Serialize(new { topic = topic, val = i }));
+ i++;
+}
+
+while (true)
+{
+ var key = Console.ReadKey(intercept: true); // intercept:true 隐藏按键显示
+
+ if (key.Key == ConsoleKey.Escape)
+ {
+ Console.WriteLine("\n程序已退出");
+ break;
+ }
+}
+ (serviceProvider as IDisposable)?.Dispose();
diff --git a/JiShe.CollectBus.Kafka.Test/appsettings.json b/JiShe.CollectBus.Kafka.Test/appsettings.json
new file mode 100644
index 0000000..55c8ef6
--- /dev/null
+++ b/JiShe.CollectBus.Kafka.Test/appsettings.json
@@ -0,0 +1,134 @@
+{
+ "Serilog": {
+ "Using": [
+ "Serilog.Sinks.Console",
+ "Serilog.Sinks.File"
+ ],
+ "MinimumLevel": {
+ "Default": "Information",
+ "Override": {
+ "Microsoft": "Warning",
+ "Volo.Abp": "Warning",
+ "Hangfire": "Warning",
+ "DotNetCore.CAP": "Warning",
+ "Serilog.AspNetCore": "Information",
+ "Microsoft.EntityFrameworkCore": "Warning",
+ "Microsoft.AspNetCore": "Warning"
+ }
+ },
+ "WriteTo": [
+ {
+ "Name": "Console"
+ },
+ {
+ "Name": "File",
+ "Args": {
+ "path": "./logs/logs-.txt",
+ "rollingInterval": "Day"
+ }
+ }
+ ]
+ },
+ "App": {
+ "SelfUrl": "http://localhost:44315",
+ "CorsOrigins": "http://localhost:4200,http://localhost:3100"
+ },
+ "ConnectionStrings": {
+ "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
+ "Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
+ "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
+ "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
+ },
+ "Redis": {
+ "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
+ "DefaultDB": "14",
+ "HangfireDB": "15"
+ },
+ "Jwt": {
+ "Audience": "JiShe.CollectBus",
+ "SecurityKey": "dzehzRz9a8asdfasfdadfasdfasdfafsdadfasbasdf=",
+ "Issuer": "JiShe.CollectBus",
+ "ExpirationTime": 2
+ },
+ "HealthCheck": {
+ "IsEnable": true,
+ "MySql": {
+ "IsEnable": true
+ },
+ "Pings": {
+ "IsEnable": true,
+ "Host": "https://www.baidu.com/",
+ "TimeOut": 5000
+ }
+ },
+ "SwaggerConfig": [
+ {
+ "GroupName": "Basic",
+ "Title": "【后台管理】基础模块",
+ "Version": "V1"
+ },
+ {
+ "GroupName": "Business",
+ "Title": "【后台管理】业务模块",
+ "Version": "V1"
+ }
+ ],
+ "Cap": {
+ "RabbitMq": {
+ "HostName": "118.190.144.92",
+ "UserName": "collectbus",
+ "Password": "123456",
+ "Port": 5672
+ }
+ },
+ "Kafka": {
+ "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
+ "EnableAuthorization": false,
+ "SecurityProtocol": "SASL_PLAINTEXT",
+ "SaslMechanism": "PLAIN",
+ "SaslUserName": "lixiao",
+ "SaslPassword": "lixiao1980"
+ //"Topic": {
+ // "ReplicationFactor": 3,
+ // "NumPartitions": 1000
+ //}
+ },
+ //"Kafka": {
+ // "Connections": {
+ // "Default": {
+ // "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092"
+ // // "SecurityProtocol": "SASL_PLAINTEXT",
+ // // "SaslMechanism": "PLAIN",
+ // // "SaslUserName": "lixiao",
+ // // "SaslPassword": "lixiao1980",
+ // }
+ // },
+ // "Consumer": {
+ // "GroupId": "JiShe.CollectBus"
+ // },
+ // "Producer": {
+ // "MessageTimeoutMs": 6000,
+ // "Acks": -1
+ // },
+ // "Topic": {
+ // "ReplicationFactor": 3,
+ // "NumPartitions": 1000
+ // },
+ // "EventBus": {
+ // "GroupId": "JiShe.CollectBus",
+ // "TopicName": "DefaultTopicName"
+ // }
+ //},
+ "IoTDBOptions": {
+ "UserName": "root",
+ "Password": "root",
+ "ClusterList": [ "192.168.1.9:6667" ],
+ "PoolSize": 2,
+ "DataBaseName": "energy",
+ "OpenDebugMode": true,
+ "UseTableSessionPoolByDefault": false
+ },
+ "ServerTagName": "JiSheCollectBus",
+ "KafkaReplicationFactor": 3,
+ "NumPartitions": 30
+}
\ No newline at end of file
diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln
index c26f3da..1d185db 100644
--- a/JiShe.CollectBus.sln
+++ b/JiShe.CollectBus.sln
@@ -37,6 +37,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvi
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{82E4562A-3A7F-4372-8D42-8AE41BA56C04}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -107,6 +109,10 @@ Global
{A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU
+ {82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -128,6 +134,7 @@ Global
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
+ {82E4562A-3A7F-4372-8D42-8AE41BA56C04} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 4373dd3..8b19e49 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -199,7 +199,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
#if DEBUG
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
- await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
+ //await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#else
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
diff --git a/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml
index f5c12d6..37f84bf 100644
--- a/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml
+++ b/src/JiShe.CollectBus.Host/Pages/Monitor.cshtml
@@ -163,6 +163,7 @@
overflow-y: hidden;
color: #555;
} */
+
.caption {
padding: 9px;
overflow-y: hidden;
diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs
index 217e35c..74bef5f 100644
--- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs
@@ -42,7 +42,8 @@ namespace JiShe.CollectBus.Kafka.AdminClient
///
public IAdminClient GetInstance(IConfiguration configuration)
{
- var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
+ ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
+ var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
var adminClientConfig = new AdminClientConfig()
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
@@ -113,6 +114,37 @@ namespace JiShe.CollectBus.Kafka.AdminClient
return metadata.Topics.Any(t => t.Topic == topic);
}
+ ///
+ /// 检测分区是否存在
+ ///
+ ///
+ ///
+ ///
+ public Dictionary CheckPartitionsExists(string topic, int[] partitions)
+ {
+ var result = new Dictionary();
+ var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
+ if (metadata.Topics.Count == 0)
+ return partitions.ToDictionary(p => p, p => false);
+ var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet();
+ return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p));
+ }
+
+ ///
+ /// 检测分区是否存在
+ ///
+ ///
+ ///
+ ///
+ public bool CheckPartitionsExist(string topic, int targetPartition)
+ {
+ var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
+ if (metadata.Topics.Count == 0)
+ return false;
+ var partitions = metadata.Topics[0].Partitions;
+ return partitions.Any(p => p.PartitionId == targetPartition);
+ }
+
public void Dispose()
{
Instance?.Dispose();
diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs
index c3d332d..00e51b3 100644
--- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs
@@ -12,5 +12,21 @@ namespace JiShe.CollectBus.Kafka.AdminClient
Task DeleteTopicAsync(string topic);
Task> ListTopicsAsync();
Task TopicExistsAsync(string topic);
+
+ ///
+ /// 检测分区是否存在
+ ///
+ ///
+ ///
+ ///
+ Dictionary CheckPartitionsExists(string topic, int[] partitions);
+
+ ///
+ /// 检测分区是否存在
+ ///
+ ///
+ ///
+ ///
+ bool CheckPartitionsExist(string topic, int targetPartition);
}
}
diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs
index 61cc788..a73eb79 100644
--- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs
@@ -1,5 +1,6 @@
using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer;
+using JiShe.CollectBus.Kafka.Producer;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;
@@ -10,6 +11,10 @@ namespace JiShe.CollectBus.Kafka
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
+ // 注册Producer
+ context.Services.AddTransient(typeof(IProducerService<,>), typeof(ProducerService<,>));
+ // 注册Consumer
+ context.Services.AddTransient(typeof(IConsumerService<,>), typeof(ConsumerService<,>));
}
}
}
diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
index 37efe3a..34d63fb 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs
@@ -9,7 +9,7 @@ using static Confluent.Kafka.ConfigPropertyNames;
namespace JiShe.CollectBus.Kafka.Consumer
{
- public abstract class ConsumerService : IConsumerService, IDisposable, ISingletonDependency
+ public abstract class ConsumerService : IConsumerService, IDisposable
{
private readonly ILogger> _logger;
private CancellationTokenSource _cancellationTokenSource;
@@ -25,11 +25,15 @@ namespace JiShe.CollectBus.Kafka.Consumer
public IConsumer GetInstance(IConfiguration configuration)
{
- var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
+
+ ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
+ var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
var consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
- AutoOffsetReset = AutoOffsetReset.Earliest
+ AutoOffsetReset = AutoOffsetReset.Earliest,
+ EnableAutoCommit = false, // 禁止AutoCommit
+ Acks = Acks.All, // 需要所有副本响应才算消费完成
};
if (enableAuthorization)
diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
index 0cfed2e..c3e65f6 100644
--- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs
@@ -11,7 +11,7 @@ using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Kafka.Producer
{
- public class ProducerService : IProducerService, IDisposable,ITransientDependency
+ public class ProducerService : IProducerService, IDisposable
{
private readonly ILogger> _logger;
@@ -27,11 +27,18 @@ namespace JiShe.CollectBus.Kafka.Producer
public IProducer GetInstance(IConfiguration configuration)
{
- var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
+ ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
+ var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
var consumerConfig = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
- AllowAutoCreateTopics = true
+ AllowAutoCreateTopics = true,
+ QueueBufferingMaxKbytes = 2097151, // 修改缓冲区最大为2GB,默认为1GB
+ CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd
+ BatchSize = 32768, // 修改批次大小为32K
+ LingerMs = 20, // 修改等待时间为20ms
+ Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功
+ MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
};
if (enableAuthorization)