dev #2
@ -0,0 +1,31 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
|
||||||
|
<PropertyGroup>
|
||||||
|
<OutputType>Exe</OutputType>
|
||||||
|
<TargetFramework>net8.0</TargetFramework>
|
||||||
|
<ImplicitUsings>enable</ImplicitUsings>
|
||||||
|
<Nullable>enable</Nullable>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<Content Include="appsettings.json">
|
||||||
|
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
|
||||||
|
<ExcludeFromSingleFile>true</ExcludeFromSingleFile>
|
||||||
|
<CopyToPublishDirectory>PreserveNewest</CopyToPublishDirectory>
|
||||||
|
</Content>
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="10.0.0-preview.3.25171.5" />
|
||||||
|
<PackageReference Include="Serilog" Version="4.2.0" />
|
||||||
|
<PackageReference Include="Serilog.Extensions.Logging" Version="9.0.1" />
|
||||||
|
<PackageReference Include="Serilog.Settings.Configuration" Version="9.0.0" />
|
||||||
|
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
|
||||||
|
<PackageReference Include="Serilog.Sinks.File" Version="6.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
</Project>
|
||||||
79
JiShe.CollectBus.Kafka.Test/Program.cs
Normal file
79
JiShe.CollectBus.Kafka.Test/Program.cs
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
// See https://aka.ms/new-console-template for more information
|
||||||
|
using Confluent.Kafka;
|
||||||
|
using JiShe.CollectBus.Kafka.AdminClient;
|
||||||
|
using JiShe.CollectBus.Kafka.Consumer;
|
||||||
|
using JiShe.CollectBus.Kafka.Producer;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Serilog;
|
||||||
|
using System.Text.Json;
|
||||||
|
using static Confluent.Kafka.ConfigPropertyNames;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// 构建配置
|
||||||
|
var config = new ConfigurationBuilder()
|
||||||
|
.SetBasePath(Directory.GetCurrentDirectory())
|
||||||
|
.AddJsonFile("appsettings.json")
|
||||||
|
.Build();
|
||||||
|
// 直接读取配置项
|
||||||
|
var greeting = config["ServerTagName"];
|
||||||
|
Console.WriteLine(greeting); // 输出: Hello, World!
|
||||||
|
|
||||||
|
|
||||||
|
// 创建服务容器
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
// 注册 IConfiguration 实例
|
||||||
|
services.AddSingleton<IConfiguration>(config);
|
||||||
|
|
||||||
|
// 初始化日志
|
||||||
|
Log.Logger = new LoggerConfiguration()
|
||||||
|
.ReadFrom.Configuration(config) // 从 appsettings.json 读取配置
|
||||||
|
.CreateLogger();
|
||||||
|
|
||||||
|
// 配置日志系统
|
||||||
|
services.AddLogging(logging =>
|
||||||
|
{
|
||||||
|
logging.ClearProviders();
|
||||||
|
logging.AddSerilog();
|
||||||
|
});
|
||||||
|
services.AddSingleton<IAdminClientService, AdminClientService>();
|
||||||
|
services.AddTransient(typeof(IProducerService<,>), typeof(ProducerService<,>));
|
||||||
|
//services.AddSingleton(typeof(IConsumerService<,>), typeof(ConsumerService<,>));
|
||||||
|
|
||||||
|
// 构建ServiceProvider
|
||||||
|
var serviceProvider = services.BuildServiceProvider();
|
||||||
|
|
||||||
|
// 获取日志记录器工厂
|
||||||
|
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
|
||||||
|
var logger = loggerFactory.CreateLogger<Program>();
|
||||||
|
logger.LogInformation("程序启动");
|
||||||
|
|
||||||
|
var adminClientService = serviceProvider.GetRequiredService<IAdminClientService>();
|
||||||
|
|
||||||
|
string topic = "test-topic";
|
||||||
|
//await adminClientService.DeleteTopicAsync(topic);
|
||||||
|
// 创建 topic
|
||||||
|
await adminClientService.CreateTopicAsync(topic, 3, 3);
|
||||||
|
|
||||||
|
|
||||||
|
var producerService = serviceProvider.GetRequiredService<IProducerService<Null, string>>();
|
||||||
|
int i = 1;
|
||||||
|
while (i <= 10)
|
||||||
|
{
|
||||||
|
await producerService.ProduceAsync(topic, JsonSerializer.Serialize(new { topic = topic, val = i }));
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
var key = Console.ReadKey(intercept: true); // intercept:true 隐藏按键显示
|
||||||
|
|
||||||
|
if (key.Key == ConsoleKey.Escape)
|
||||||
|
{
|
||||||
|
Console.WriteLine("\n程序已退出");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(serviceProvider as IDisposable)?.Dispose();
|
||||||
134
JiShe.CollectBus.Kafka.Test/appsettings.json
Normal file
134
JiShe.CollectBus.Kafka.Test/appsettings.json
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
{
|
||||||
|
"Serilog": {
|
||||||
|
"Using": [
|
||||||
|
"Serilog.Sinks.Console",
|
||||||
|
"Serilog.Sinks.File"
|
||||||
|
],
|
||||||
|
"MinimumLevel": {
|
||||||
|
"Default": "Information",
|
||||||
|
"Override": {
|
||||||
|
"Microsoft": "Warning",
|
||||||
|
"Volo.Abp": "Warning",
|
||||||
|
"Hangfire": "Warning",
|
||||||
|
"DotNetCore.CAP": "Warning",
|
||||||
|
"Serilog.AspNetCore": "Information",
|
||||||
|
"Microsoft.EntityFrameworkCore": "Warning",
|
||||||
|
"Microsoft.AspNetCore": "Warning"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"WriteTo": [
|
||||||
|
{
|
||||||
|
"Name": "Console"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Name": "File",
|
||||||
|
"Args": {
|
||||||
|
"path": "./logs/logs-.txt",
|
||||||
|
"rollingInterval": "Day"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"App": {
|
||||||
|
"SelfUrl": "http://localhost:44315",
|
||||||
|
"CorsOrigins": "http://localhost:4200,http://localhost:3100"
|
||||||
|
},
|
||||||
|
"ConnectionStrings": {
|
||||||
|
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
|
||||||
|
"Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
|
||||||
|
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
|
||||||
|
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
|
||||||
|
},
|
||||||
|
"Redis": {
|
||||||
|
"Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
|
||||||
|
"DefaultDB": "14",
|
||||||
|
"HangfireDB": "15"
|
||||||
|
},
|
||||||
|
"Jwt": {
|
||||||
|
"Audience": "JiShe.CollectBus",
|
||||||
|
"SecurityKey": "dzehzRz9a8asdfasfdadfasdfasdfafsdadfasbasdf=",
|
||||||
|
"Issuer": "JiShe.CollectBus",
|
||||||
|
"ExpirationTime": 2
|
||||||
|
},
|
||||||
|
"HealthCheck": {
|
||||||
|
"IsEnable": true,
|
||||||
|
"MySql": {
|
||||||
|
"IsEnable": true
|
||||||
|
},
|
||||||
|
"Pings": {
|
||||||
|
"IsEnable": true,
|
||||||
|
"Host": "https://www.baidu.com/",
|
||||||
|
"TimeOut": 5000
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"SwaggerConfig": [
|
||||||
|
{
|
||||||
|
"GroupName": "Basic",
|
||||||
|
"Title": "【后台管理】基础模块",
|
||||||
|
"Version": "V1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"GroupName": "Business",
|
||||||
|
"Title": "【后台管理】业务模块",
|
||||||
|
"Version": "V1"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"Cap": {
|
||||||
|
"RabbitMq": {
|
||||||
|
"HostName": "118.190.144.92",
|
||||||
|
"UserName": "collectbus",
|
||||||
|
"Password": "123456",
|
||||||
|
"Port": 5672
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"Kafka": {
|
||||||
|
"BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092",
|
||||||
|
"EnableAuthorization": false,
|
||||||
|
"SecurityProtocol": "SASL_PLAINTEXT",
|
||||||
|
"SaslMechanism": "PLAIN",
|
||||||
|
"SaslUserName": "lixiao",
|
||||||
|
"SaslPassword": "lixiao1980"
|
||||||
|
//"Topic": {
|
||||||
|
// "ReplicationFactor": 3,
|
||||||
|
// "NumPartitions": 1000
|
||||||
|
//}
|
||||||
|
},
|
||||||
|
//"Kafka": {
|
||||||
|
// "Connections": {
|
||||||
|
// "Default": {
|
||||||
|
// "BootstrapServers": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092"
|
||||||
|
// // "SecurityProtocol": "SASL_PLAINTEXT",
|
||||||
|
// // "SaslMechanism": "PLAIN",
|
||||||
|
// // "SaslUserName": "lixiao",
|
||||||
|
// // "SaslPassword": "lixiao1980",
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// "Consumer": {
|
||||||
|
// "GroupId": "JiShe.CollectBus"
|
||||||
|
// },
|
||||||
|
// "Producer": {
|
||||||
|
// "MessageTimeoutMs": 6000,
|
||||||
|
// "Acks": -1
|
||||||
|
// },
|
||||||
|
// "Topic": {
|
||||||
|
// "ReplicationFactor": 3,
|
||||||
|
// "NumPartitions": 1000
|
||||||
|
// },
|
||||||
|
// "EventBus": {
|
||||||
|
// "GroupId": "JiShe.CollectBus",
|
||||||
|
// "TopicName": "DefaultTopicName"
|
||||||
|
// }
|
||||||
|
//},
|
||||||
|
"IoTDBOptions": {
|
||||||
|
"UserName": "root",
|
||||||
|
"Password": "root",
|
||||||
|
"ClusterList": [ "192.168.1.9:6667" ],
|
||||||
|
"PoolSize": 2,
|
||||||
|
"DataBaseName": "energy",
|
||||||
|
"OpenDebugMode": true,
|
||||||
|
"UseTableSessionPoolByDefault": false
|
||||||
|
},
|
||||||
|
"ServerTagName": "JiSheCollectBus",
|
||||||
|
"KafkaReplicationFactor": 3,
|
||||||
|
"NumPartitions": 30
|
||||||
|
}
|
||||||
@ -37,6 +37,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvi
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.Test", "src\JiShe.CollectBus.Protocol.Test\JiShe.CollectBus.Protocol.Test.csproj", "{A377955E-7EA1-6F29-8CF7-774569E93925}"
|
||||||
EndProject
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{82E4562A-3A7F-4372-8D42-8AE41BA56C04}"
|
||||||
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
Debug|Any CPU = Debug|Any CPU
|
Debug|Any CPU = Debug|Any CPU
|
||||||
@ -107,6 +109,10 @@ Global
|
|||||||
{A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
{A377955E-7EA1-6F29-8CF7-774569E93925}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU
|
{A377955E-7EA1-6F29-8CF7-774569E93925}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
@ -128,6 +134,7 @@ Global
|
|||||||
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{F0288175-F0EC-48BD-945F-CF1512850943} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
|
{82E4562A-3A7F-4372-8D42-8AE41BA56C04} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||||
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
|
||||||
|
|||||||
@ -199,7 +199,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
||||||
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
||||||
#else
|
#else
|
||||||
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
||||||
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
|
||||||
|
|||||||
@ -163,6 +163,7 @@
|
|||||||
overflow-y: hidden;
|
overflow-y: hidden;
|
||||||
color: #555;
|
color: #555;
|
||||||
} */
|
} */
|
||||||
|
|
||||||
.caption {
|
.caption {
|
||||||
padding: 9px;
|
padding: 9px;
|
||||||
overflow-y: hidden;
|
overflow-y: hidden;
|
||||||
|
|||||||
@ -42,7 +42,8 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public IAdminClient GetInstance(IConfiguration configuration)
|
public IAdminClient GetInstance(IConfiguration configuration)
|
||||||
{
|
{
|
||||||
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
|
||||||
|
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
|
||||||
var adminClientConfig = new AdminClientConfig()
|
var adminClientConfig = new AdminClientConfig()
|
||||||
{
|
{
|
||||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||||
@ -113,6 +114,37 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
return metadata.Topics.Any(t => t.Topic == topic);
|
return metadata.Topics.Any(t => t.Topic == topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 检测分区是否存在
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic"></param>
|
||||||
|
/// <param name="partitions"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public Dictionary<int, bool> CheckPartitionsExists(string topic, int[] partitions)
|
||||||
|
{
|
||||||
|
var result = new Dictionary<int, bool>();
|
||||||
|
var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
|
||||||
|
if (metadata.Topics.Count == 0)
|
||||||
|
return partitions.ToDictionary(p => p, p => false);
|
||||||
|
var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet();
|
||||||
|
return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 检测分区是否存在
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic"></param>
|
||||||
|
/// <param name="targetPartition"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public bool CheckPartitionsExist(string topic, int targetPartition)
|
||||||
|
{
|
||||||
|
var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
|
||||||
|
if (metadata.Topics.Count == 0)
|
||||||
|
return false;
|
||||||
|
var partitions = metadata.Topics[0].Partitions;
|
||||||
|
return partitions.Any(p => p.PartitionId == targetPartition);
|
||||||
|
}
|
||||||
|
|
||||||
public void Dispose()
|
public void Dispose()
|
||||||
{
|
{
|
||||||
Instance?.Dispose();
|
Instance?.Dispose();
|
||||||
|
|||||||
@ -12,5 +12,21 @@ namespace JiShe.CollectBus.Kafka.AdminClient
|
|||||||
Task DeleteTopicAsync(string topic);
|
Task DeleteTopicAsync(string topic);
|
||||||
Task<List<string>> ListTopicsAsync();
|
Task<List<string>> ListTopicsAsync();
|
||||||
Task<bool> TopicExistsAsync(string topic);
|
Task<bool> TopicExistsAsync(string topic);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 检测分区是否存在
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic"></param>
|
||||||
|
/// <param name="partitions"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
Dictionary<int, bool> CheckPartitionsExists(string topic, int[] partitions);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 检测分区是否存在
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="topic"></param>
|
||||||
|
/// <param name="targetPartition"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
bool CheckPartitionsExist(string topic, int targetPartition);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
using Confluent.Kafka;
|
using Confluent.Kafka;
|
||||||
using JiShe.CollectBus.Kafka.Consumer;
|
using JiShe.CollectBus.Kafka.Consumer;
|
||||||
|
using JiShe.CollectBus.Kafka.Producer;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Volo.Abp.Modularity;
|
using Volo.Abp.Modularity;
|
||||||
@ -10,6 +11,10 @@ namespace JiShe.CollectBus.Kafka
|
|||||||
{
|
{
|
||||||
public override void ConfigureServices(ServiceConfigurationContext context)
|
public override void ConfigureServices(ServiceConfigurationContext context)
|
||||||
{
|
{
|
||||||
|
// 注册Producer
|
||||||
|
context.Services.AddTransient(typeof(IProducerService<,>), typeof(ProducerService<,>));
|
||||||
|
// 注册Consumer
|
||||||
|
context.Services.AddTransient(typeof(IConsumerService<,>), typeof(ConsumerService<,>));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,7 +9,7 @@ using static Confluent.Kafka.ConfigPropertyNames;
|
|||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Consumer
|
namespace JiShe.CollectBus.Kafka.Consumer
|
||||||
{
|
{
|
||||||
public abstract class ConsumerService<TKey, TValue> : IConsumerService<TKey, TValue>, IDisposable, ISingletonDependency
|
public abstract class ConsumerService<TKey, TValue> : IConsumerService<TKey, TValue>, IDisposable
|
||||||
{
|
{
|
||||||
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
|
private readonly ILogger<ConsumerService<TKey, TValue>> _logger;
|
||||||
private CancellationTokenSource _cancellationTokenSource;
|
private CancellationTokenSource _cancellationTokenSource;
|
||||||
@ -25,11 +25,15 @@ namespace JiShe.CollectBus.Kafka.Consumer
|
|||||||
|
|
||||||
public IConsumer<TKey, TValue> GetInstance(IConfiguration configuration)
|
public IConsumer<TKey, TValue> GetInstance(IConfiguration configuration)
|
||||||
{
|
{
|
||||||
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
|
||||||
|
ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
|
||||||
|
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
|
||||||
var consumerConfig = new ConsumerConfig
|
var consumerConfig = new ConsumerConfig
|
||||||
{
|
{
|
||||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||||
AutoOffsetReset = AutoOffsetReset.Earliest
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||||||
|
EnableAutoCommit = false, // 禁止AutoCommit
|
||||||
|
Acks = Acks.All, // 需要所有副本响应才算消费完成
|
||||||
};
|
};
|
||||||
|
|
||||||
if (enableAuthorization)
|
if (enableAuthorization)
|
||||||
|
|||||||
@ -11,7 +11,7 @@ using Volo.Abp.DependencyInjection;
|
|||||||
|
|
||||||
namespace JiShe.CollectBus.Kafka.Producer
|
namespace JiShe.CollectBus.Kafka.Producer
|
||||||
{
|
{
|
||||||
public class ProducerService<TKey, TValue> : IProducerService<TKey, TValue>, IDisposable,ITransientDependency
|
public class ProducerService<TKey, TValue> : IProducerService<TKey, TValue>, IDisposable
|
||||||
{
|
{
|
||||||
|
|
||||||
private readonly ILogger<ProducerService<TKey, TValue>> _logger;
|
private readonly ILogger<ProducerService<TKey, TValue>> _logger;
|
||||||
@ -27,11 +27,18 @@ namespace JiShe.CollectBus.Kafka.Producer
|
|||||||
|
|
||||||
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
|
public IProducer<TKey, TValue> GetInstance(IConfiguration configuration)
|
||||||
{
|
{
|
||||||
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]);
|
ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
|
||||||
|
var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
|
||||||
var consumerConfig = new ProducerConfig
|
var consumerConfig = new ProducerConfig
|
||||||
{
|
{
|
||||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||||
AllowAutoCreateTopics = true
|
AllowAutoCreateTopics = true,
|
||||||
|
QueueBufferingMaxKbytes = 2097151, // 修改缓冲区最大为2GB,默认为1GB
|
||||||
|
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd
|
||||||
|
BatchSize = 32768, // 修改批次大小为32K
|
||||||
|
LingerMs = 20, // 修改等待时间为20ms
|
||||||
|
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功
|
||||||
|
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
|
||||||
};
|
};
|
||||||
|
|
||||||
if (enableAuthorization)
|
if (enableAuthorization)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user