优化发布订阅

This commit is contained in:
zenghongyao 2025-04-17 14:39:14 +08:00
parent 871ed615a4
commit 0ba64a4b90
12 changed files with 47 additions and 52 deletions

View File

@ -7,6 +7,7 @@ using System.Threading.Tasks;
using DeviceDetectorNET.Class.Device; using DeviceDetectorNET.Class.Device;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;

View File

@ -5,6 +5,7 @@ using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device; using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;

View File

@ -265,7 +265,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
return aa == null; return aa == null;
} }
[KafkaSubscribe("test-topic1")] //[KafkaSubscribe("test-topic1")]
public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj) public async Task<ISubscribeAck> KafkaSubscribeAsync(object obj)
{ {

View File

@ -1,4 +1,5 @@
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;

View File

@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device; using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;

View File

@ -4,7 +4,7 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts namespace JiShe.CollectBus.Common.Consts
{ {
public class ProtocolConst public class ProtocolConst
{ {

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
@ -7,7 +8,7 @@ using System.Reflection;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace JiShe.CollectBus.Protocol.Contracts namespace JiShe.CollectBus.Common.Extensions
{ {
public class ProtocolConstExtensions public class ProtocolConstExtensions
{ {

View File

@ -361,45 +361,5 @@ namespace JiShe.CollectBus.Host
}); });
}); });
} }
/// <summary>
/// 配置Kafka主题
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
public void ConfigureKafkaTopic(ServiceConfigurationContext context, IConfiguration configuration)
{
var adminClient = new AdminClientBuilder(new AdminClientConfig
{
BootstrapServers = configuration.GetConnectionString(CommonConst.Kafka)
}).Build();
try
{
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
foreach (var item in topics)
{
topicSpecifications.Add(new TopicSpecification
{
Name = item,
NumPartitions = 3,
ReplicationFactor = 1
});
}
adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
throw;
}
}
}
} }
} }

View File

@ -50,7 +50,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
AutoOffsetReset = AutoOffsetReset.Earliest, AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记 EnablePartitionEof = true, // 启用分区末尾标记
AllowAutoCreateTopics= true, // 启用自动创建 //AllowAutoCreateTopics= true, // 启用自动创建
FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB FetchMaxBytes = 1024 * 1024 * 50 // 增加拉取大小50MB
}; };

View File

@ -1,5 +1,6 @@
using Confluent.Kafka; using Confluent.Kafka;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Kafka.AdminClient; using JiShe.CollectBus.Kafka.AdminClient;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Consumer;
@ -8,6 +9,7 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Reflection; using System.Reflection;
namespace JiShe.CollectBus.Kafka namespace JiShe.CollectBus.Kafka
@ -23,6 +25,18 @@ namespace JiShe.CollectBus.Kafka
{ {
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
lifetime.ApplicationStarted.Register(() => lifetime.ApplicationStarted.Register(() =>
{ {
var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>(); var logger = provider.GetRequiredService<ILogger<CollectBusKafkaModule>>();
@ -55,7 +69,7 @@ namespace JiShe.CollectBus.Kafka
{ {
if (subscribe!=null) if (subscribe!=null)
{ {
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger); Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1; threadCount += tuple.Item1;
topicCount += tuple.Item2; topicCount += tuple.Item2;
} }
@ -70,6 +84,17 @@ namespace JiShe.CollectBus.Kafka
{ {
var provider = app.ApplicationServices; var provider = app.ApplicationServices;
var lifetime = provider.GetRequiredService<IHostApplicationLifetime>(); var lifetime = provider.GetRequiredService<IHostApplicationLifetime>();
//初始化主题信息
var kafkaAdminClient = provider.GetRequiredService<IAdminClientService>();
var kafkaOptions = provider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor).ConfigureAwait(false).GetAwaiter().GetResult();
}
lifetime.ApplicationStarted.Register(() => lifetime.ApplicationStarted.Register(() =>
{ {
@ -88,7 +113,7 @@ namespace JiShe.CollectBus.Kafka
if (subscribe != null) if (subscribe != null)
{ {
Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger); Tuple<int, int> tuple = BuildKafkaSubscriber(subscribe, provider, logger, kafkaOptions.Value);
threadCount += tuple.Item1; threadCount += tuple.Item1;
topicCount += tuple.Item2; topicCount += tuple.Item2;
} }
@ -103,17 +128,17 @@ namespace JiShe.CollectBus.Kafka
/// </summary> /// </summary>
/// <param name="subscribe"></param> /// <param name="subscribe"></param>
/// <param name="provider"></param> /// <param name="provider"></param>
private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger) private static Tuple<int,int> BuildKafkaSubscriber(object subscribe, IServiceProvider provider,ILogger<CollectBusKafkaModule> logger, KafkaOptionConfig kafkaOptionConfig)
{ {
var subscribedMethods = subscribe.GetType().GetMethods() var subscribedMethods = subscribe.GetType().GetMethods()
.Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() }) .Select(m => new { Method = m, Attribute = m.GetCustomAttribute<KafkaSubscribeAttribute>() })
.Where(x => x.Attribute != null) .Where(x => x.Attribute != null)
.ToArray(); .ToArray();
var configuration = provider.GetRequiredService<IConfiguration>(); //var configuration = provider.GetRequiredService<IConfiguration>();
int threadCount = 0; int threadCount = 0;
foreach (var sub in subscribedMethods) foreach (var sub in subscribedMethods)
{ {
int partitionCount = configuration.GetValue<int>(CommonConst.NumPartitions); int partitionCount = kafkaOptionConfig.NumPartitions;
//var adminClientService = provider.GetRequiredService<IAdminClientService>(); //var adminClientService = provider.GetRequiredService<IAdminClientService>();
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0) if (partitionCount <= 0)
@ -154,7 +179,7 @@ namespace JiShe.CollectBus.Kafka
logger.LogError($"kafka批量消费异常:{ex.Message}"); logger.LogError($"kafka批量消费异常:{ex.Message}");
} }
return await Task.FromResult(false); return await Task.FromResult(false);
}); }, attr.GroupId, attr.BatchSize,attr.BatchTimeout);
} }
else else
{ {
@ -171,7 +196,7 @@ namespace JiShe.CollectBus.Kafka
logger.LogError($"kafka消费异常:{ex.Message}"); logger.LogError($"kafka消费异常:{ex.Message}");
} }
return await Task.FromResult(false); return await Task.FromResult(false);
}); }, attr.GroupId);
} }
} }

View File

@ -13,6 +13,7 @@ using JiShe.CollectBus.IotSystems.Protocols;
using MassTransit; using MassTransit;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Common.Consts;
namespace JiShe.CollectBus.Protocol.Contracts.Abstracts namespace JiShe.CollectBus.Protocol.Contracts.Abstracts
{ {

View File

@ -20,4 +20,8 @@
<ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" /> <ProjectReference Include="..\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Folder Include="Extensions\" />
</ItemGroup>
</Project> </Project>