From 02f2a2cafc4e06046f39dd4ecb17a74cc326591f Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Mon, 21 Apr 2025 10:17:40 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=84=E8=8C=83=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusIoTDBModule.cs | 35 +- .../Context/IoTDBRuntimeContext.cs | 4 +- .../Interface/IIoTDBProvider.cs | 2 +- .../Interface/IIoTDBSessionFactory.cs | 4 +- .../Interface/IIoTDBSessionPool.cs | 2 +- .../Options/IoTDBOptions.cs | 2 +- .../Provider/IoTDBProvider.cs | 33 +- .../Provider/IoTDBSessionFactory.cs | 19 +- .../Provider/SessionPoolAdapter.cs | 10 +- .../Provider/TableSessionPoolAdapter.cs | 10 +- .../AdminClient/AdminClientService.cs | 356 +++++++++--------- .../Attributes/KafkaSubscribeAttribute.cs | 120 +++--- .../Attributes/TopicAttribute.cs | 43 +-- .../Consumer/ConsumerService.cs | 13 +- .../Consumer/IConsumerService.cs | 74 ++-- .../Internal/HeadersFilter.cs | 38 +- .../Internal/IKafkaSubscribe.cs | 25 +- .../Internal/ISubscribeAck.cs | 29 +- .../Internal/KafkaOptionConfig.cs | 103 +++-- .../Internal/ReflectionHelper.cs | 180 +++++---- .../Internal/SubscribeResult.cs | 121 +++--- ...ensions.cs => KafkaSubscribeExtensions.cs} | 36 +- .../Producer/IProducerService.cs | 5 - .../Producer/ProducerService.cs | 7 + .../CollectBusApplicationModule.cs | 2 +- .../Samples/SampleAppService.cs | 10 +- .../BasicScheduledMeterReadingService.cs | 4 +- ...nergySystemScheduledMeterReadingService.cs | 2 +- .../Subscribers/SubscriberAppService.cs | 4 +- 29 files changed, 604 insertions(+), 689 deletions(-) rename modules/JiShe.CollectBus.Kafka/{KafkaSubcribesExtensions.cs => KafkaSubscribeExtensions.cs} (93%) diff --git a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs index 93bbbfe..2cce113 100644 --- a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs +++ b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs @@ -1,33 +1,22 @@ using JiShe.CollectBus.IoTDB.Context; -using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; -using JiShe.CollectBus.IoTDB.Provider; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Volo.Abp.Modularity; -namespace JiShe.CollectBus.IoTDB +namespace JiShe.CollectBus.IoTDB; + +/// +/// CollectBusIoTDBModule +/// +public class CollectBusIoTDbModule : AbpModule { - public class CollectBusIoTDBModule : AbpModule + public override void ConfigureServices(ServiceConfigurationContext context) { - public override void ConfigureServices(ServiceConfigurationContext context) - { - - var configuration = context.Services.GetConfiguration(); - Configure(options => - { - configuration.GetSection(nameof(IoTDBOptions)).Bind(options); - }); + var configuration = context.Services.GetConfiguration(); + Configure(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); }); - // 注册上下文为Scoped - context.Services.AddScoped(); - - // 注册Session工厂 - context.Services.AddSingleton(); - - // 注册Provider - context.Services.AddScoped(); - - } + // 注册上下文为Scoped + context.Services.AddScoped(); } -} +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs index cd99b00..f1619ac 100644 --- a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs +++ b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs @@ -6,11 +6,11 @@ namespace JiShe.CollectBus.IoTDB.Context /// /// IoTDB SessionPool 运行时上下文 /// - public class IoTDBRuntimeContext + public class IoTDbRuntimeContext { private readonly bool _defaultValue; - public IoTDBRuntimeContext(IOptions options) + public IoTDbRuntimeContext(IOptions options) { _defaultValue = options.Value.UseTableSessionPoolByDefault; UseTableSessionPool = _defaultValue; diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs index bb47841..02ac3ee 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs @@ -7,7 +7,7 @@ namespace JiShe.CollectBus.IoTDB.Interface /// /// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置 /// - public interface IIoTDBProvider + public interface IIoTDbProvider { ///// ///// 切换 SessionPool diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs index 03cd4a6..c2337ea 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs @@ -3,8 +3,8 @@ /// /// Session 工厂接口 /// - public interface IIoTDBSessionFactory:IDisposable + public interface IIoTDbSessionFactory:IDisposable { - IIoTDBSessionPool GetSessionPool(bool useTableSession); + IIoTDbSessionPool GetSessionPool(bool useTableSession); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs index 026a83a..9587549 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs @@ -5,7 +5,7 @@ namespace JiShe.CollectBus.IoTDB.Interface /// /// Session 连接池 /// - public interface IIoTDBSessionPool : IDisposable + public interface IIoTDbSessionPool : IDisposable { /// /// 打开连接池 diff --git a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs index c9c0610..69463ba 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs @@ -3,7 +3,7 @@ /// /// IOTDB配置 /// - public class IoTDBOptions + public class IoTDbOptions { /// /// 数据库名称,表模型才有,树模型为空 diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 9e18ac8..7ce03cc 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -9,26 +9,33 @@ using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Logging; +using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.IoTDB.Provider { /// /// IoTDB数据源 /// - public class IoTDBProvider : IIoTDBProvider + public class IoTDbProvider : IIoTDbProvider, IScopedDependency { - private static readonly ConcurrentDictionary _metadataCache = new(); - private readonly ILogger _logger; - private readonly IIoTDBSessionFactory _sessionFactory; - private readonly IoTDBRuntimeContext _runtimeContext; + private static readonly ConcurrentDictionary MetadataCache = new(); + private readonly ILogger _logger; + private readonly IIoTDbSessionFactory _sessionFactory; + private readonly IoTDbRuntimeContext _runtimeContext; - private IIoTDBSessionPool CurrentSession => + private IIoTDbSessionPool CurrentSession => _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); - public IoTDBProvider( - ILogger logger, - IIoTDBSessionFactory sessionFactory, - IoTDBRuntimeContext runtimeContext) + /// + /// IoTDbProvider + /// + /// + /// + /// + public IoTDbProvider( + ILogger logger, + IIoTDbSessionFactory sessionFactory, + IoTDbRuntimeContext runtimeContext) { _logger = logger; _sessionFactory = sessionFactory; @@ -167,7 +174,7 @@ namespace JiShe.CollectBus.IoTDB.Provider var indexOf = metadata.ColumnNames.IndexOf(measurement); metadata.ColumnNames[indexOf] = (string)item1!; - + rowValues.Add(item2); } @@ -396,14 +403,14 @@ namespace JiShe.CollectBus.IoTDB.Provider var columns = CollectColumnMetadata(typeof(T)); var metadata = BuildDeviceMetadata(columns); - return _metadataCache.AddOrUpdate( + return MetadataCache.AddOrUpdate( typeof(T), addValueFactory: t => metadata, // 如果键不存在,用此值添加 updateValueFactory: (t, existingValue) => { var columns = CollectColumnMetadata(t); var metadata = BuildDeviceMetadata(columns); - + //对现有值 existingValue 进行修改,返回新值 existingValue.ColumnNames = metadata.ColumnNames; return existingValue; diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs index 572e93d..27b9200 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs @@ -2,6 +2,7 @@ using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.IoTDB.Provider { @@ -9,25 +10,29 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 实现带缓存的Session工厂 /// - public class IoTDBSessionFactory : IIoTDBSessionFactory + public class IoTDbSessionFactory : IIoTDbSessionFactory, ISingletonDependency { - private readonly IoTDBOptions _options; - private readonly ConcurrentDictionary _pools = new(); + private readonly IoTDbOptions _options; + private readonly ConcurrentDictionary _pools = new(); private bool _disposed; - public IoTDBSessionFactory(IOptions options) + /// + /// IoTDbSessionFactory + /// + /// + public IoTDbSessionFactory(IOptions options) { _options = options.Value; } - public IIoTDBSessionPool GetSessionPool(bool useTableSession) + public IIoTDbSessionPool GetSessionPool(bool useTableSession) { - if (_disposed) throw new ObjectDisposedException(nameof(IoTDBSessionFactory)); + if (_disposed) throw new ObjectDisposedException(nameof(IoTDbSessionFactory)); return _pools.GetOrAdd(useTableSession, key => { var pool = key - ? (IIoTDBSessionPool)new TableSessionPoolAdapter(_options) + ? (IIoTDbSessionPool)new TableSessionPoolAdapter(_options) : new SessionPoolAdapter(_options); pool.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult(); ; diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs index 44692bd..dd04f60 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs @@ -9,12 +9,16 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 树模型连接池 /// - public class SessionPoolAdapter : IIoTDBSessionPool + public class SessionPoolAdapter : IIoTDbSessionPool { private readonly SessionPool _sessionPool; - private readonly IoTDBOptions _options; + private readonly IoTDbOptions _options; - public SessionPoolAdapter(IoTDBOptions options) + /// + /// SessionPoolAdapter + /// + /// + public SessionPoolAdapter(IoTDbOptions options) { _options = options; _sessionPool = new SessionPool.Builder() diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs index 1efd04f..be42ad7 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs @@ -9,12 +9,16 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 表模型Session连接池 /// - public class TableSessionPoolAdapter : IIoTDBSessionPool + public class TableSessionPoolAdapter : IIoTDbSessionPool { private readonly TableSessionPool _sessionPool; - private readonly IoTDBOptions _options; + private readonly IoTDbOptions _options; - public TableSessionPoolAdapter(IoTDBOptions options) + /// + /// TableSessionPoolAdapter + /// + /// + public TableSessionPoolAdapter(IoTDbOptions options) { _options = options; _sessionPool = new TableSessionPool.Builder() diff --git a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs index 59e34fa..66643a5 100644 --- a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs +++ b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs @@ -1,204 +1,190 @@ using Confluent.Kafka; -using Microsoft.Extensions.Configuration; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using Confluent.Kafka.Admin; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; -namespace JiShe.CollectBus.Kafka.AdminClient +namespace JiShe.CollectBus.Kafka.AdminClient; + +public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency { - public class AdminClientService : IAdminClientService, IDisposable,ISingletonDependency + private readonly ILogger _logger; + + /// + /// Initializes a new instance of the class. + /// + /// + /// + public AdminClientService(IConfiguration configuration, ILogger logger) { + _logger = logger; + Instance = GetInstance(configuration); + } - private readonly ILogger _logger; + /// + /// Gets or sets the instance. + /// + /// + /// The instance. + /// + public IAdminClient Instance { get; set; } - /// - /// Initializes a new instance of the class. - /// - /// The configuration. - /// The logger. - public AdminClientService(IConfiguration configuration, ILogger logger) + /// + /// 创建Kafka主题 + /// + /// + /// + /// + /// + public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) + { + try { - _logger = logger; - GetInstance(configuration); - } + if (await CheckTopicAsync(topic)) return; - /// - /// Gets or sets the instance. - /// - /// - /// The instance. - /// - public IAdminClient Instance { get; set; } = default; - /// - /// Gets the instance. - /// - /// The configuration. - /// - public IAdminClient GetInstance(IConfiguration configuration) - { - ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); - var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); - var adminClientConfig = new AdminClientConfig() + await Instance.CreateTopicsAsync(new[] { - BootstrapServers = configuration["Kafka:BootstrapServers"], - }; - if (enableAuthorization) - { - adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; - adminClientConfig.SaslMechanism = SaslMechanism.Plain; - adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"]; - adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"]; - } - Instance = new AdminClientBuilder(adminClientConfig).Build(); - return Instance; - } - - /// - /// Checks the topic asynchronous. - /// - /// The topic. - /// - public async Task CheckTopicAsync(string topic) - { - var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); - return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); - } - - /// - /// 判断Kafka主题是否存在 - /// - /// 主题名称 - /// 副本数量,不能高于Brokers数量 - /// - public async Task CheckTopicAsync(string topic,int numPartitions) - { - var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); - if(numPartitions > metadata.Brokers.Count) - { - throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。") ; - } - - return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); - } - - //// - /// 创建Kafka主题 - /// - /// 主题名称 - /// 主题分区数量 - /// 副本数量,不能高于Brokers数量 - /// - public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) - { - - try - { - if (await CheckTopicAsync(topic)) return; - - - await Instance.CreateTopicsAsync(new[] + new TopicSpecification { - new TopicSpecification - { - Name = topic, - NumPartitions = numPartitions, - ReplicationFactor = replicationFactor - } - }); - } - catch (CreateTopicsException e) - { - if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) - { - throw; + Name = topic, + NumPartitions = numPartitions, + ReplicationFactor = replicationFactor } - } + }); } - - /// - /// 删除Kafka主题 - /// - /// - /// - public async Task DeleteTopicAsync(string topic) + catch (CreateTopicsException e) { - await Instance.DeleteTopicsAsync(new[] { topic }); - } - - /// - /// 获取Kafka主题列表 - /// - /// - public async Task> ListTopicsAsync() - { - var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); - return new List(metadata.Topics.Select(t => t.Topic)); - } - - /// - /// 判断Kafka主题是否存在 - /// - /// - /// - public async Task TopicExistsAsync(string topic) - { - var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); - return metadata.Topics.Any(t => t.Topic == topic); - } - - /// - /// 检测分区是否存在 - /// - /// - /// - /// - public Dictionary CheckPartitionsExists(string topic, int[] partitions) - { - var result = new Dictionary(); - var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); - if (metadata.Topics.Count == 0) - return partitions.ToDictionary(p => p, p => false); - var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet(); - return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p)); - } - - /// - /// 检测分区是否存在 - /// - /// - /// - /// - public bool CheckPartitionsExist(string topic, int targetPartition) - { - var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); - if (metadata.Topics.Count == 0) - return false; - var partitions = metadata.Topics[0].Partitions; - return partitions.Any(p => p.PartitionId == targetPartition); - } - - /// - /// 获取主题的分区数量 - /// - /// - /// - public int GetTopicPartitionsNum(string topic) - { - var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); - if (metadata.Topics.Count == 0) - return 0; - return metadata.Topics[0].Partitions.Count; - } - - public void Dispose() - { - Instance?.Dispose(); + if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) throw; } } -} + + /// + /// 删除Kafka主题 + /// + /// + /// + public async Task DeleteTopicAsync(string topic) + { + await Instance.DeleteTopicsAsync(new[] { topic }); + } + + /// + /// 获取Kafka主题列表 + /// + /// + public async Task> ListTopicsAsync() + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); + return new List(metadata.Topics.Select(t => t.Topic)); + } + + /// + /// 判断Kafka主题是否存在 + /// + /// + /// + public async Task TopicExistsAsync(string topic) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); + return metadata.Topics.Any(t => t.Topic == topic); + } + + /// + /// 检测分区是否存在 + /// + /// + /// + /// + public Dictionary CheckPartitionsExists(string topic, int[] partitions) + { + var result = new Dictionary(); + var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); + if (metadata.Topics.Count == 0) + return partitions.ToDictionary(p => p, p => false); + var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet(); + return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p)); + } + + /// + /// 检测分区是否存在 + /// + /// + /// + /// + public bool CheckPartitionsExist(string topic, int targetPartition) + { + var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); + if (metadata.Topics.Count == 0) + return false; + var partitions = metadata.Topics[0].Partitions; + return partitions.Any(p => p.PartitionId == targetPartition); + } + + /// + /// 获取主题的分区数量 + /// + /// + /// + public int GetTopicPartitionsNum(string topic) + { + var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); + if (metadata.Topics.Count == 0) + return 0; + return metadata.Topics[0].Partitions.Count; + } + + public void Dispose() + { + Instance?.Dispose(); + } + + /// + /// Gets the instance. + /// + /// The configuration. + /// + public IAdminClient GetInstance(IConfiguration configuration) + { + ArgumentException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); + var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); + var adminClientConfig = new AdminClientConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"] + }; + if (enableAuthorization) + { + adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; + adminClientConfig.SaslMechanism = SaslMechanism.Plain; + adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"]; + adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"]; + } + return new AdminClientBuilder(adminClientConfig).Build(); + } + + /// + /// Checks the topic asynchronous. + /// + /// The topic. + /// + public async Task CheckTopicAsync(string topic) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); + return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); + } + + /// + /// 判断Kafka主题是否存在 + /// + /// 主题名称 + /// 副本数量,不能高于Brokers数量 + /// + public async Task CheckTopicAsync(string topic, int numPartitions) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); + if (numPartitions > metadata.Brokers.Count) + throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。"); + + return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); + } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs index 598caf0..3b2cfae 100644 --- a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs +++ b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs @@ -1,68 +1,60 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Attributes; -namespace JiShe.CollectBus.Kafka.Attributes +[AttributeUsage(AttributeTargets.Method)] +public class KafkaSubscribeAttribute : Attribute { - [AttributeUsage(AttributeTargets.Method)] - public class KafkaSubscribeAttribute : Attribute + /// + /// 订阅主题 + /// + /// + public KafkaSubscribeAttribute(string topic) { - /// - /// 订阅的主题 - /// - public string Topic { get; set; } = null!; - - /// - /// 分区 - /// - public int Partition { get; set; } = -1; - - /// - /// 消费者组 - /// - public string? GroupId { get; set; } = null;//"default" - - /// - /// 任务数(默认是多少个分区多少个任务) - /// 如设置订阅指定Partition则任务数始终为1 - /// - public int TaskCount { get; set; } = -1; - - /// - /// 批量处理数量 - /// - public int BatchSize { get; set; } = 100; - - /// - /// 是否启用批量处理 - /// - public bool EnableBatch { get; set; } = false; - - /// - /// 批次超时时间 - /// 格式:("00:05:00") - /// - public TimeSpan? BatchTimeout { get; set; }=null; - - - /// - /// 订阅主题 - /// - /// - public KafkaSubscribeAttribute(string topic) - { - this.Topic = topic; - } - - /// - /// 订阅主题 - /// - public KafkaSubscribeAttribute(string topic, int partition) - { - this.Topic = topic; - this.Partition = partition; - } + Topic = topic; } -} + + /// + /// 订阅主题 + /// + public KafkaSubscribeAttribute(string topic, int partition) + { + Topic = topic; + Partition = partition; + } + + /// + /// 订阅的主题 + /// + public string Topic { get; set; } = null!; + + /// + /// 分区 + /// + public int Partition { get; set; } = -1; + + /// + /// 消费者组 + /// + public string? GroupId { get; set; } = null; //"default" + + /// + /// 任务数(默认是多少个分区多少个任务) + /// 如设置订阅指定Partition则任务数始终为1 + /// + public int TaskCount { get; set; } = -1; + + /// + /// 批量处理数量 + /// + public int BatchSize { get; set; } = 100; + + /// + /// 是否启用批量处理 + /// + public bool EnableBatch { get; set; } = false; + + /// + /// 批次超时时间 + /// 格式:("00:05:00") + /// + public TimeSpan? BatchTimeout { get; set; } = null; +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs b/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs index 4cb2fff..a023edb 100644 --- a/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs +++ b/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs @@ -1,29 +1,22 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Attributes; -namespace JiShe.CollectBus.Kafka.Attributes +[AttributeUsage(AttributeTargets.Class, Inherited = false)] +public class TopicAttribute : Attribute { - [AttributeUsage(AttributeTargets.Class, Inherited = false)] - public class TopicAttribute: Attribute + /// + /// Initializes a new instance of the class. + /// + /// The name. + public TopicAttribute(string name = "Default") { - /// - /// Initializes a new instance of the class. - /// - /// The name. - public TopicAttribute(string name = "Default") - { - Name = name; - } - - /// - /// Gets or sets the name. - /// - /// - /// The name. - /// - public string Name { get; set; } + Name = name; } -} + + /// + /// Gets or sets the name. + /// + /// + /// The name. + /// + public string Name { get; set; } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 5093561..5313833 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -13,15 +13,18 @@ namespace JiShe.CollectBus.Kafka.Consumer public class ConsumerService : IConsumerService, IDisposable { private readonly ILogger _logger; - private readonly IConfiguration _configuration; private readonly ConcurrentDictionary _consumerStore = new(); private readonly KafkaOptionConfig _kafkaOptionConfig; private class KafkaConsumer where TKey : notnull where TValue : class { } - public ConsumerService(IConfiguration configuration, ILogger logger, IOptions kafkaOptionConfig) + /// + /// ConsumerService + /// + /// + /// + public ConsumerService(ILogger logger, IOptions kafkaOptionConfig) { - _configuration = configuration; _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; } @@ -165,10 +168,10 @@ namespace JiShe.CollectBus.Kafka.Consumer /// /// 订阅消息 /// - /// /// /// /// + /// /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { @@ -387,7 +390,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费等待时间 public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { - await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); + await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); } diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs index 77901ef..32ade01 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs @@ -1,46 +1,50 @@ -using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Consumer; -namespace JiShe.CollectBus.Kafka.Consumer +public interface IConsumerService { - public interface IConsumerService - { - Task SubscribeAsync(string topic, Func> messageHandler, string? groupId=null) where TKey : notnull where TValue : class; + Task SubscribeAsync(string topic, Func> messageHandler, + string? groupId = null) where TKey : notnull where TValue : class; - /// - /// 订阅消息 - /// - /// - /// - /// - /// - Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class; + /// + /// 订阅消息 + /// + /// + /// + /// + /// + Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) + where TValue : class; - Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TKey : notnull where TValue : class; + Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) + where TKey : notnull where TValue : class; - /// - /// 订阅消息 - /// - /// - /// - /// - /// - /// - Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class; + /// + /// 订阅消息 + /// + /// + /// + /// + /// + /// + Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) + where TValue : class; - Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, + string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) + where TKey : notnull where TValue : class; - Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, + string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) + where TKey : notnull where TValue : class; - Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, + string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) + where TValue : class; - Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, + string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) + where TValue : class; - void Unsubscribe() where TKey : notnull where TValue : class; - } -} + void Unsubscribe() where TKey : notnull where TValue : class; +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs index 7b479fa..8bcdcdc 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs @@ -1,30 +1,22 @@ using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka.Internal +namespace JiShe.CollectBus.Kafka.Internal; + +/// +/// 消息头过滤器 +/// +public class HeadersFilter : Dictionary { /// - /// 消息头过滤器 + /// 判断Headers是否匹配 /// - public class HeadersFilter : Dictionary + /// + /// + public bool Match(Headers headers) { - /// - /// 判断Headers是否匹配 - /// - /// - /// - public bool Match(Headers headers) - { - foreach (var kvp in this) - { - if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value)) - return false; - } - return true; - } + foreach (var kvp in this) + if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value)) + return false; + return true; } -} +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs index 0ad450f..5345f1b 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs @@ -1,18 +1,11 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Internal; -namespace JiShe.CollectBus.Kafka.Internal +/// +/// Kafka订阅者 +/// +/// 订阅者需要继承此接口并需要依赖注入,并使用标记 +/// +/// +public interface IKafkaSubscribe { - /// - /// Kafka订阅者 - /// - /// 订阅者需要继承此接口并需要依赖注入,并使用标记 - /// - /// - public interface IKafkaSubscribe - { - } -} +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs index ce24d69..27512f7 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs @@ -1,21 +1,14 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Internal; -namespace JiShe.CollectBus.Kafka.Internal +public interface ISubscribeAck { - public interface ISubscribeAck - { - /// - /// 是否成功标记 - /// - bool Ack { get; set; } + /// + /// 是否成功标记 + /// + bool Ack { get; set; } - /// - /// 消息 - /// - string? Msg { get; set; } - } -} + /// + /// 消息 + /// + string? Msg { get; set; } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index a3fb126..5f7bdf1 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -1,68 +1,61 @@ using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka.Internal +namespace JiShe.CollectBus.Kafka.Internal; + +public class KafkaOptionConfig { - public class KafkaOptionConfig - { - /// - /// kafka地址 - /// - public string BootstrapServers { get; set; } = null!; + /// + /// kafka地址 + /// + public string BootstrapServers { get; set; } = null!; - /// - /// 服务器标识 - /// - public string ServerTagName { get; set; }= "KafkaFilterKey"; + /// + /// 服务器标识 + /// + public string ServerTagName { get; set; } = "KafkaFilterKey"; - /// - /// kafka主题副本数量 - /// - public short KafkaReplicationFactor { get; set; } + /// + /// kafka主题副本数量 + /// + public short KafkaReplicationFactor { get; set; } - /// - /// kafka主题分区数量 - /// - public int NumPartitions { get; set; } + /// + /// kafka主题分区数量 + /// + public int NumPartitions { get; set; } - /// - /// 是否开启过滤器 - /// - public bool EnableFilter { get; set; }= true; + /// + /// 是否开启过滤器 + /// + public bool EnableFilter { get; set; } = true; - /// - /// 是否开启认证 - /// - public bool EnableAuthorization { get; set; } = false; + /// + /// 是否开启认证 + /// + public bool EnableAuthorization { get; set; } = false; - /// - /// 安全协议 - /// - public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.SaslPlaintext; + /// + /// 安全协议 + /// + public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.SaslPlaintext; - /// - /// 认证方式 - /// - public SaslMechanism SaslMechanism { get; set; }= SaslMechanism.Plain; + /// + /// 认证方式 + /// + public SaslMechanism SaslMechanism { get; set; } = SaslMechanism.Plain; - /// - /// 用户名 - /// - public string? SaslUserName { get; set; } + /// + /// 用户名 + /// + public string? SaslUserName { get; set; } - /// - /// 密码 - /// - public string? SaslPassword { get; set; } + /// + /// 密码 + /// + public string? SaslPassword { get; set; } - /// - /// 首次采集时间 - /// - public DateTime FirstCollectionTime { get; set; } - - } -} + /// + /// 首次采集时间 + /// + public DateTime FirstCollectionTime { get; set; } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs index ded11d7..a39db81 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs @@ -1,113 +1,103 @@ -using Newtonsoft.Json; -using System; -using System.Collections; -using System.Collections.Generic; -using System.Linq; +using System.Collections; using System.Reflection; -using System.Text; -using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka.Internal +namespace JiShe.CollectBus.Kafka.Internal; + +/// +/// 反射辅助类 +/// +public static class ReflectionHelper { /// - /// 反射辅助类 + /// 集合类型 + /// Item1:参数类型 + /// Item2:集合元素类型 /// - public static class ReflectionHelper + public static Tuple GetParameterTypeInfo(this MethodInfo method, int parameterIndex = 0) { - /// - ///集合类型 - ///Item1:参数类型 - ///Item2:集合元素类型 - /// - public static Tuple GetParameterTypeInfo(this MethodInfo method, int parameterIndex=0) - { - // 参数校验 - if (method == null) throw new ArgumentNullException(nameof(method)); - var parameters = method.GetParameters(); - if (parameterIndex < 0 || parameterIndex >= parameters.Length) - throw new ArgumentOutOfRangeException(nameof(parameterIndex)); + // 参数校验 + if (method == null) throw new ArgumentNullException(nameof(method)); + var parameters = method.GetParameters(); + if (parameterIndex < 0 || parameterIndex >= parameters.Length) + throw new ArgumentOutOfRangeException(nameof(parameterIndex)); - ParameterInfo param = parameters[parameterIndex]; - Type paramType = param.ParameterType; - Type? elementType = null; + var param = parameters[parameterIndex]; + var paramType = param.ParameterType; + Type? elementType = null; - // 判断是否是集合类型(排除字符串) - if (paramType != typeof(string) && IsEnumerableType(paramType)) - { - elementType = GetEnumerableElementType(paramType); - } + // 判断是否是集合类型(排除字符串) + if (paramType != typeof(string) && IsEnumerableType(paramType)) + elementType = GetEnumerableElementType(paramType); - return Tuple.Create(paramType, elementType); - - } + return Tuple.Create(paramType, elementType); + } - /// - /// 判断是否是集合类型(排除字符串) - /// - public static bool IsEnumerableType(this Type type) - { - return type.IsArray - || (type.IsGenericType && type.GetInterfaces() - .Any(t => t.IsGenericType - && t.GetGenericTypeDefinition() == typeof(IEnumerable<>))) - || type.GetInterfaces().Any(t => t == typeof(System.Collections.IEnumerable)); - } + /// + /// 判断是否是集合类型(排除字符串) + /// + public static bool IsEnumerableType(this Type type) + { + return type.IsArray + || (type.IsGenericType && type.GetInterfaces() + .Any(t => t.IsGenericType + && t.GetGenericTypeDefinition() == typeof(IEnumerable<>))) + || type.GetInterfaces().Any(t => t == typeof(IEnumerable)); + } - /// - /// 获取集合元素的类型 - /// - public static Type? GetEnumerableElementType(this Type type) - { - // 处理数组类型 - if (type.IsArray) - return type.GetElementType(); + /// + /// 获取集合元素的类型 + /// + public static Type? GetEnumerableElementType(this Type type) + { + // 处理数组类型 + if (type.IsArray) + return type.GetElementType(); - // 处理直接实现IEnumerable的类型(如IEnumerable本身) - if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IEnumerable<>)) - return type.GetGenericArguments()[0]; + // 处理直接实现IEnumerable的类型(如IEnumerable本身) + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IEnumerable<>)) + return type.GetGenericArguments()[0]; - // 处理通过接口实现IEnumerable的泛型集合(如List) - var genericEnumerable = type.GetInterfaces() - .FirstOrDefault(t => t.IsGenericType - && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)); - if (genericEnumerable != null) - return genericEnumerable.GetGenericArguments()[0]; + // 处理通过接口实现IEnumerable的泛型集合(如List) + var genericEnumerable = type.GetInterfaces() + .FirstOrDefault(t => t.IsGenericType + && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)); + if (genericEnumerable != null) + return genericEnumerable.GetGenericArguments()[0]; - // 处理非泛型集合类型(如 ArrayList) - if (typeof(IEnumerable).IsAssignableFrom(type) && type == typeof(ArrayList)) - return typeof(ArrayList); - // 返回null表示无法确定元素类型 - return null; - } + // 处理非泛型集合类型(如 ArrayList) + if (typeof(IEnumerable).IsAssignableFrom(type) && type == typeof(ArrayList)) + return typeof(ArrayList); + // 返回null表示无法确定元素类型 + return null; + } - // - /// 判断是否使用强转换 - /// - /// 目标类型 - /// - public static bool IsConvertType(this Type targetType) - { - // 处理可空类型 - Type underlyingType = Nullable.GetUnderlyingType(targetType) ?? targetType; - // 情况1:值类型或基元类型(如 int、DateTime) - if (underlyingType.IsValueType || underlyingType.IsPrimitive) - return true; - // 情况2:字符串类型直接赋值 - else if (underlyingType == typeof(string)) - return true; - - // 情况3:枚举类型处理 - //else if (underlyingType.IsEnum) - //{ - // if (Enum.IsDefined(underlyingType, msg)) - // { - // convertedValue = Enum.Parse(underlyingType, msg.ToString()); - // return true; - // } - // return false; - //} - return false; - } + /// + /// 判断是否使用强转换 + /// + /// + /// + public static bool IsConvertType(this Type targetType) + { + // 处理可空类型 + var underlyingType = Nullable.GetUnderlyingType(targetType) ?? targetType; + // 情况1:值类型或基元类型(如 int、DateTime) + if (underlyingType.IsValueType || underlyingType.IsPrimitive) + return true; + // 情况2:字符串类型直接赋值 + if (underlyingType == typeof(string)) + return true; + + // 情况3:枚举类型处理 + //else if (underlyingType.IsEnum) + //{ + // if (Enum.IsDefined(underlyingType, msg)) + // { + // convertedValue = Enum.Parse(underlyingType, msg.ToString()); + // return true; + // } + // return false; + //} + return false; } } \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs index 0c016ff..d3fbcf6 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs @@ -1,75 +1,62 @@ -using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using static System.Runtime.InteropServices.JavaScript.JSType; +namespace JiShe.CollectBus.Kafka.Internal; -namespace JiShe.CollectBus.Kafka.Internal +public class SubscribeResult : ISubscribeAck { - public class SubscribeResult: ISubscribeAck + /// + /// 是否成功 + /// + public bool Ack { get; set; } + + /// + /// 消息 + /// + public string? Msg { get; set; } + + + /// + /// 成功 + /// + /// 消息 + public SubscribeResult Success(string? msg = null) { - /// - /// 是否成功 - /// - public bool Ack { get; set; } - - /// - /// 消息 - /// - public string? Msg { get; set; } - - - /// - /// 成功 - /// - /// 消息 - public SubscribeResult Success(string? msg = null) - { - Ack = true; - Msg = msg; - return this; - } - - /// - /// 失败 - /// - /// - /// - /// - /// - public SubscribeResult Fail(string? msg = null) - { - Msg = msg; - Ack = false; - return this; - } + Ack = true; + Msg = msg; + return this; } - public static partial class SubscribeAck + /// + /// 失败 + /// + /// + /// + public SubscribeResult Fail(string? msg = null) { - - /// - /// 成功 - /// - /// 消息 - /// - public static ISubscribeAck Success(string? msg = null) - { - return new SubscribeResult().Success(msg); - } - - - /// - /// 失败 - /// - /// 消息 - /// - public static ISubscribeAck Fail(string? msg = null) - { - return new SubscribeResult().Fail(msg); - } + Msg = msg; + Ack = false; + return this; } - } + +public static class SubscribeAck +{ + /// + /// 成功 + /// + /// 消息 + /// + public static ISubscribeAck Success(string? msg = null) + { + return new SubscribeResult().Success(msg); + } + + + /// + /// 失败 + /// + /// 消息 + /// + public static ISubscribeAck Fail(string? msg = null) + { + return new SubscribeResult().Fail(msg); + } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs similarity index 93% rename from modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs rename to modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs index a5bf39c..64f8dec 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs @@ -8,26 +8,17 @@ using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Mvc.Abstractions; -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using System; using System.Collections; -using System.Collections.Generic; using System.ComponentModel; -using System.Linq.Expressions; using System.Reflection; -using System.Text.Json; -using System.Threading.Tasks; -using YamlDotNet.Core.Tokens; -using static System.Runtime.InteropServices.JavaScript.JSType; namespace JiShe.CollectBus.Kafka { - public static class KafkaSubcribesExtensions + public static class KafkaSubscribeExtensions { public static void UseInitKafkaTopic(this IServiceProvider provider) @@ -36,7 +27,7 @@ namespace JiShe.CollectBus.Kafka var kafkaAdminClient = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); - List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); foreach (var item in topics) @@ -48,8 +39,6 @@ namespace JiShe.CollectBus.Kafka /// /// 添加Kafka订阅 /// - /// - /// public static void UseKafkaSubscribe(this IServiceProvider provider) { var lifetime = provider.GetRequiredService(); @@ -57,8 +46,8 @@ namespace JiShe.CollectBus.Kafka lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); - int threadCount = 0; - int topicCount = 0; + var threadCount = 0; + var topicCount = 0; var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); if (string.IsNullOrWhiteSpace(assemblyPath)) { @@ -98,6 +87,9 @@ namespace JiShe.CollectBus.Kafka }); } + /// + /// 添加Kafka订阅 + /// public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly) { var provider = app.ApplicationServices; @@ -134,8 +126,6 @@ namespace JiShe.CollectBus.Kafka /// /// 构建Kafka订阅 /// - /// - /// private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider, ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() @@ -169,11 +159,6 @@ namespace JiShe.CollectBus.Kafka /// /// 启动后台消费线程 /// - /// - /// - /// - /// - /// private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); @@ -225,10 +210,6 @@ namespace JiShe.CollectBus.Kafka /// /// 处理消息 /// - /// - /// - /// - /// private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); @@ -351,9 +332,6 @@ namespace JiShe.CollectBus.Kafka } return false; } - - - } diff --git a/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs index becea90..8cbe665 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs @@ -1,9 +1,4 @@ using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace JiShe.CollectBus.Kafka.Producer { diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index db0efd8..16499b5 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -23,6 +23,13 @@ namespace JiShe.CollectBus.Kafka.Producer private readonly ConcurrentDictionary _producerCache = new(); private class KafkaProducer where TKey : notnull where TValue : class { } private readonly KafkaOptionConfig _kafkaOptionConfig; + + /// + /// ProducerService + /// + /// + /// + /// public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig) { _configuration = configuration; diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 1874b25..988c9df 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -41,7 +41,7 @@ namespace JiShe.CollectBus; typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule), typeof(CollectBusKafkaModule), - typeof(CollectBusIoTDBModule), + typeof(CollectBusIoTDbModule), typeof(CollectBusCassandraModule) )] public class CollectBusApplicationModule : AbpModule diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index f915fd6..2f22697 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -34,13 +34,13 @@ namespace JiShe.CollectBus.Samples; public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaSubscribe { private readonly ILogger _logger; - private readonly IIoTDBProvider _iotDBProvider; - private readonly IoTDBRuntimeContext _dbContext; - private readonly IoTDBOptions _options; + private readonly IIoTDbProvider _iotDBProvider; + private readonly IoTDbRuntimeContext _dbContext; + private readonly IoTDbOptions _options; private readonly IRedisDataCacheService _redisDataCacheService; - public SampleAppService(IIoTDBProvider iotDBProvider, IOptions options, - IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) + public SampleAppService(IIoTDbProvider iotDBProvider, IOptions options, + IoTDbRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) { _iotDBProvider = iotDBProvider; _options = options.Value; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index a7f9f3c..4ff5dd6 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -37,7 +37,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService { private readonly ILogger _logger; - private readonly IIoTDBProvider _dbProvider; + private readonly IIoTDbProvider _dbProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IProducerService _producerService; private readonly IRedisDataCacheService _redisDataCacheService; @@ -48,7 +48,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IRedisDataCacheService redisDataCacheService, - IIoTDBProvider dbProvider, + IIoTDbProvider dbProvider, IOptions kafkaOptions) { _logger = logger; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index fe398a5..b8fd08b 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -37,7 +37,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading string serverTagName = string.Empty; public EnergySystemScheduledMeterReadingService( ILogger logger, - IIoTDBProvider dbProvider, + IIoTDbProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository, IOptions kafkaOptions, IProducerService producerService, diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 2ac8801..8db1072 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -31,7 +31,7 @@ namespace JiShe.CollectBus.Subscribers private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; - private readonly IIoTDBProvider _dbProvider; + private readonly IIoTDbProvider _dbProvider; /// /// Initializes a new instance of the class. @@ -47,7 +47,7 @@ namespace JiShe.CollectBus.Subscribers IServiceProvider serviceProvider, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository, - IIoTDBProvider dbProvider, + IIoTDbProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordsRepository) { _logger = logger;