dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
6 changed files with 49 additions and 17 deletions
Showing only changes of commit 26ffffa233 - Show all commits

View File

@ -89,7 +89,9 @@
"SecurityProtocol": "SaslPlaintext",
"SaslMechanism": "Plain",
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980"
"SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30
//"Topic": {
// "ReplicationFactor": 3,
// "NumPartitions": 1000
@ -131,8 +133,6 @@
"UseTableSessionPoolByDefault": false
},
"ServerTagName": "JiSheCollectBus3",
"KafkaReplicationFactor": 3,
"NumPartitions": 30,
"Cassandra": {
"ReplicationStrategy": {
"Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy

View File

@ -1,6 +1,7 @@
using JiShe.CollectBus.IoTDBProvider.Context;
using JiShe.CollectBus.IoTDBProvider.Interface;
using JiShe.CollectBus.IoTDBProvider.Provider;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
@ -8,6 +9,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Modularity;
using static Thrift.Server.TThreadPoolAsyncServer;
namespace JiShe.CollectBus.IoTDBProvider
{
@ -15,7 +17,12 @@ namespace JiShe.CollectBus.IoTDBProvider
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.Configure<IoTDBOptions>(context.Services.GetConfiguration().GetSection(nameof(IoTDBOptions)));
var configuration = context.Services.GetConfiguration();
Configure<IoTDBOptions>(options =>
{
configuration.GetSection(nameof(IoTDBOptions)).Bind(options);
});
// 注册上下文为Scoped
context.Services.AddScoped<IoTDBRuntimeContext>();

View File

@ -1,4 +1,5 @@
using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Producer;
using Microsoft.AspNetCore.Builder;
@ -17,14 +18,23 @@ namespace JiShe.CollectBus.Kafka
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
var kafkaSection = configuration.GetSection("Kafka");
KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig ();
kafkaSection.Bind(kafkaOptionConfig);
if (configuration["ServerTagName"] != null)
//var kafkaSection = configuration.GetSection(CommonConst.Kafka);
//KafkaOptionConfig kafkaOptionConfig = new KafkaOptionConfig ();
//kafkaSection.Bind(kafkaOptionConfig);
//if (configuration[CommonConst.ServerTagName] != null)
//{
// kafkaOptionConfig.ServerTagName = configuration[CommonConst.ServerTagName]!;
//}
//context.Services.AddSingleton(kafkaOptionConfig);
context.Services.Configure<KafkaOptionConfig>(context.Services.GetConfiguration().GetSection(CommonConst.Kafka));
Configure<KafkaOptionConfig>(options =>
{
kafkaOptionConfig.ServerTagName = configuration["ServerTagName"]!;
}
context.Services.AddSingleton(kafkaOptionConfig);
configuration.GetSection(CommonConst.Kafka).Bind(options);
});
// 注册Producer
context.Services.AddSingleton<IProducerService, ProducerService>();
// 注册Consumer

View File

@ -12,4 +12,8 @@
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project>

View File

@ -19,6 +19,16 @@ namespace JiShe.CollectBus.Kafka
/// </summary>
public string ServerTagName { get; set; }= "KafkaFilterKey";
/// <summary>
/// kafka主题副本数量
/// </summary>
public int KafkaReplicationFactor { get; set; }
/// <summary>
/// kafka主题分区数量
/// </summary>
public int NumPartitions { get; set; }
/// <summary>
/// 是否开启过滤器
/// </summary>

View File

@ -1,10 +1,10 @@
using Confluent.Kafka;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Mvc.ApplicationParts;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@ -109,12 +109,13 @@ namespace JiShe.CollectBus.Kafka
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
.Where(x => x.Attribute != null)
.ToArray();
var configuration = provider.GetRequiredService<IConfiguration>();
int threadCount = 0;
foreach (var sub in subscribedMethods)
{
var adminClientService = provider.GetRequiredService<IAdminClientService>();
int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
int partitionCount = configuration.GetValue<int>(CommonConst.NumPartitions);
//var adminClientService = provider.GetRequiredService<IAdminClientService>();
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)
partitionCount = 1;
for (int i = 0; i < partitionCount; i++)