diff --git a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs
index 93bbbfe..2cce113 100644
--- a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs
+++ b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs
@@ -1,33 +1,22 @@
using JiShe.CollectBus.IoTDB.Context;
-using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
-using JiShe.CollectBus.IoTDB.Provider;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.Modularity;
-namespace JiShe.CollectBus.IoTDB
+namespace JiShe.CollectBus.IoTDB;
+
+///
+/// CollectBusIoTDBModule
+///
+public class CollectBusIoTDbModule : AbpModule
{
- public class CollectBusIoTDBModule : AbpModule
+ public override void ConfigureServices(ServiceConfigurationContext context)
{
- public override void ConfigureServices(ServiceConfigurationContext context)
- {
-
- var configuration = context.Services.GetConfiguration();
- Configure(options =>
- {
- configuration.GetSection(nameof(IoTDBOptions)).Bind(options);
- });
+ var configuration = context.Services.GetConfiguration();
+ Configure(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); });
- // 注册上下文为Scoped
- context.Services.AddScoped();
-
- // 注册Session工厂
- context.Services.AddSingleton();
-
- // 注册Provider
- context.Services.AddScoped();
-
- }
+ // 注册上下文为Scoped
+ context.Services.AddScoped();
}
-}
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs
index cd99b00..f1619ac 100644
--- a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs
@@ -6,11 +6,11 @@ namespace JiShe.CollectBus.IoTDB.Context
///
/// IoTDB SessionPool 运行时上下文
///
- public class IoTDBRuntimeContext
+ public class IoTDbRuntimeContext
{
private readonly bool _defaultValue;
- public IoTDBRuntimeContext(IOptions options)
+ public IoTDbRuntimeContext(IOptions options)
{
_defaultValue = options.Value.UseTableSessionPoolByDefault;
UseTableSessionPool = _defaultValue;
diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs
index bb47841..02ac3ee 100644
--- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs
@@ -7,7 +7,7 @@ namespace JiShe.CollectBus.IoTDB.Interface
///
/// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置
///
- public interface IIoTDBProvider
+ public interface IIoTDbProvider
{
/////
///// 切换 SessionPool
diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs
index 03cd4a6..c2337ea 100644
--- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs
@@ -3,8 +3,8 @@
///
/// Session 工厂接口
///
- public interface IIoTDBSessionFactory:IDisposable
+ public interface IIoTDbSessionFactory:IDisposable
{
- IIoTDBSessionPool GetSessionPool(bool useTableSession);
+ IIoTDbSessionPool GetSessionPool(bool useTableSession);
}
}
diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs
index 026a83a..9587549 100644
--- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs
@@ -5,7 +5,7 @@ namespace JiShe.CollectBus.IoTDB.Interface
///
/// Session 连接池
///
- public interface IIoTDBSessionPool : IDisposable
+ public interface IIoTDbSessionPool : IDisposable
{
///
/// 打开连接池
diff --git a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs
index c9c0610..69463ba 100644
--- a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs
@@ -3,7 +3,7 @@
///
/// IOTDB配置
///
- public class IoTDBOptions
+ public class IoTDbOptions
{
///
/// 数据库名称,表模型才有,树模型为空
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
index 9e18ac8..7ce03cc 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
@@ -9,26 +9,33 @@ using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging;
+using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.IoTDB.Provider
{
///
/// IoTDB数据源
///
- public class IoTDBProvider : IIoTDBProvider
+ public class IoTDbProvider : IIoTDbProvider, IScopedDependency
{
- private static readonly ConcurrentDictionary _metadataCache = new();
- private readonly ILogger _logger;
- private readonly IIoTDBSessionFactory _sessionFactory;
- private readonly IoTDBRuntimeContext _runtimeContext;
+ private static readonly ConcurrentDictionary MetadataCache = new();
+ private readonly ILogger _logger;
+ private readonly IIoTDbSessionFactory _sessionFactory;
+ private readonly IoTDbRuntimeContext _runtimeContext;
- private IIoTDBSessionPool CurrentSession =>
+ private IIoTDbSessionPool CurrentSession =>
_sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
- public IoTDBProvider(
- ILogger logger,
- IIoTDBSessionFactory sessionFactory,
- IoTDBRuntimeContext runtimeContext)
+ ///
+ /// IoTDbProvider
+ ///
+ ///
+ ///
+ ///
+ public IoTDbProvider(
+ ILogger logger,
+ IIoTDbSessionFactory sessionFactory,
+ IoTDbRuntimeContext runtimeContext)
{
_logger = logger;
_sessionFactory = sessionFactory;
@@ -167,7 +174,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
var indexOf = metadata.ColumnNames.IndexOf(measurement);
metadata.ColumnNames[indexOf] = (string)item1!;
-
+
rowValues.Add(item2);
}
@@ -396,14 +403,14 @@ namespace JiShe.CollectBus.IoTDB.Provider
var columns = CollectColumnMetadata(typeof(T));
var metadata = BuildDeviceMetadata(columns);
- return _metadataCache.AddOrUpdate(
+ return MetadataCache.AddOrUpdate(
typeof(T),
addValueFactory: t => metadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) =>
{
var columns = CollectColumnMetadata(t);
var metadata = BuildDeviceMetadata(columns);
-
+
//对现有值 existingValue 进行修改,返回新值
existingValue.ColumnNames = metadata.ColumnNames;
return existingValue;
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs
index 572e93d..27b9200 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs
@@ -2,6 +2,7 @@
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Options;
+using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.IoTDB.Provider
{
@@ -9,25 +10,29 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
/// 实现带缓存的Session工厂
///
- public class IoTDBSessionFactory : IIoTDBSessionFactory
+ public class IoTDbSessionFactory : IIoTDbSessionFactory, ISingletonDependency
{
- private readonly IoTDBOptions _options;
- private readonly ConcurrentDictionary _pools = new();
+ private readonly IoTDbOptions _options;
+ private readonly ConcurrentDictionary _pools = new();
private bool _disposed;
- public IoTDBSessionFactory(IOptions options)
+ ///
+ /// IoTDbSessionFactory
+ ///
+ ///
+ public IoTDbSessionFactory(IOptions options)
{
_options = options.Value;
}
- public IIoTDBSessionPool GetSessionPool(bool useTableSession)
+ public IIoTDbSessionPool GetSessionPool(bool useTableSession)
{
- if (_disposed) throw new ObjectDisposedException(nameof(IoTDBSessionFactory));
+ if (_disposed) throw new ObjectDisposedException(nameof(IoTDbSessionFactory));
return _pools.GetOrAdd(useTableSession, key =>
{
var pool = key
- ? (IIoTDBSessionPool)new TableSessionPoolAdapter(_options)
+ ? (IIoTDbSessionPool)new TableSessionPoolAdapter(_options)
: new SessionPoolAdapter(_options);
pool.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult(); ;
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
index 44692bd..dd04f60 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
@@ -9,12 +9,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
/// 树模型连接池
///
- public class SessionPoolAdapter : IIoTDBSessionPool
+ public class SessionPoolAdapter : IIoTDbSessionPool
{
private readonly SessionPool _sessionPool;
- private readonly IoTDBOptions _options;
+ private readonly IoTDbOptions _options;
- public SessionPoolAdapter(IoTDBOptions options)
+ ///
+ /// SessionPoolAdapter
+ ///
+ ///
+ public SessionPoolAdapter(IoTDbOptions options)
{
_options = options;
_sessionPool = new SessionPool.Builder()
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
index 1efd04f..be42ad7 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
@@ -9,12 +9,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
/// 表模型Session连接池
///
- public class TableSessionPoolAdapter : IIoTDBSessionPool
+ public class TableSessionPoolAdapter : IIoTDbSessionPool
{
private readonly TableSessionPool _sessionPool;
- private readonly IoTDBOptions _options;
+ private readonly IoTDbOptions _options;
- public TableSessionPoolAdapter(IoTDBOptions options)
+ ///
+ /// TableSessionPoolAdapter
+ ///
+ ///
+ public TableSessionPoolAdapter(IoTDbOptions options)
{
_options = options;
_sessionPool = new TableSessionPool.Builder()
diff --git a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs
index 59e34fa..66643a5 100644
--- a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs
+++ b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs
@@ -1,204 +1,190 @@
using Confluent.Kafka;
-using Microsoft.Extensions.Configuration;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
using Confluent.Kafka.Admin;
+using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
-namespace JiShe.CollectBus.Kafka.AdminClient
+namespace JiShe.CollectBus.Kafka.AdminClient;
+
+public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency
{
- public class AdminClientService : IAdminClientService, IDisposable,ISingletonDependency
+ private readonly ILogger _logger;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ ///
+ public AdminClientService(IConfiguration configuration, ILogger logger)
{
+ _logger = logger;
+ Instance = GetInstance(configuration);
+ }
- private readonly ILogger _logger;
+ ///
+ /// Gets or sets the instance.
+ ///
+ ///
+ /// The instance.
+ ///
+ public IAdminClient Instance { get; set; }
- ///
- /// Initializes a new instance of the class.
- ///
- /// The configuration.
- /// The logger.
- public AdminClientService(IConfiguration configuration, ILogger logger)
+ ///
+ /// 创建Kafka主题
+ ///
+ ///
+ ///
+ ///
+ ///
+ public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor)
+ {
+ try
{
- _logger = logger;
- GetInstance(configuration);
- }
+ if (await CheckTopicAsync(topic)) return;
- ///
- /// Gets or sets the instance.
- ///
- ///
- /// The instance.
- ///
- public IAdminClient Instance { get; set; } = default;
- ///
- /// Gets the instance.
- ///
- /// The configuration.
- ///
- public IAdminClient GetInstance(IConfiguration configuration)
- {
- ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
- var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
- var adminClientConfig = new AdminClientConfig()
+ await Instance.CreateTopicsAsync(new[]
{
- BootstrapServers = configuration["Kafka:BootstrapServers"],
- };
- if (enableAuthorization)
- {
- adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
- adminClientConfig.SaslMechanism = SaslMechanism.Plain;
- adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"];
- adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"];
- }
- Instance = new AdminClientBuilder(adminClientConfig).Build();
- return Instance;
- }
-
- ///
- /// Checks the topic asynchronous.
- ///
- /// The topic.
- ///
- public async Task CheckTopicAsync(string topic)
- {
- var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5));
- return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
- }
-
- ///
- /// 判断Kafka主题是否存在
- ///
- /// 主题名称
- /// 副本数量,不能高于Brokers数量
- ///
- public async Task CheckTopicAsync(string topic,int numPartitions)
- {
- var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5));
- if(numPartitions > metadata.Brokers.Count)
- {
- throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。") ;
- }
-
- return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
- }
-
- ////
- /// 创建Kafka主题
- ///
- /// 主题名称
- /// 主题分区数量
- /// 副本数量,不能高于Brokers数量
- ///
- public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor)
- {
-
- try
- {
- if (await CheckTopicAsync(topic)) return;
-
-
- await Instance.CreateTopicsAsync(new[]
+ new TopicSpecification
{
- new TopicSpecification
- {
- Name = topic,
- NumPartitions = numPartitions,
- ReplicationFactor = replicationFactor
- }
- });
- }
- catch (CreateTopicsException e)
- {
- if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
- {
- throw;
+ Name = topic,
+ NumPartitions = numPartitions,
+ ReplicationFactor = replicationFactor
}
- }
+ });
}
-
- ///
- /// 删除Kafka主题
- ///
- ///
- ///
- public async Task DeleteTopicAsync(string topic)
+ catch (CreateTopicsException e)
{
- await Instance.DeleteTopicsAsync(new[] { topic });
- }
-
- ///
- /// 获取Kafka主题列表
- ///
- ///
- public async Task> ListTopicsAsync()
- {
- var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
- return new List(metadata.Topics.Select(t => t.Topic));
- }
-
- ///
- /// 判断Kafka主题是否存在
- ///
- ///
- ///
- public async Task TopicExistsAsync(string topic)
- {
- var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
- return metadata.Topics.Any(t => t.Topic == topic);
- }
-
- ///
- /// 检测分区是否存在
- ///
- ///
- ///
- ///
- public Dictionary CheckPartitionsExists(string topic, int[] partitions)
- {
- var result = new Dictionary();
- var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
- if (metadata.Topics.Count == 0)
- return partitions.ToDictionary(p => p, p => false);
- var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet();
- return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p));
- }
-
- ///
- /// 检测分区是否存在
- ///
- ///
- ///
- ///
- public bool CheckPartitionsExist(string topic, int targetPartition)
- {
- var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
- if (metadata.Topics.Count == 0)
- return false;
- var partitions = metadata.Topics[0].Partitions;
- return partitions.Any(p => p.PartitionId == targetPartition);
- }
-
- ///
- /// 获取主题的分区数量
- ///
- ///
- ///
- public int GetTopicPartitionsNum(string topic)
- {
- var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
- if (metadata.Topics.Count == 0)
- return 0;
- return metadata.Topics[0].Partitions.Count;
- }
-
- public void Dispose()
- {
- Instance?.Dispose();
+ if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) throw;
}
}
-}
+
+ ///
+ /// 删除Kafka主题
+ ///
+ ///
+ ///
+ public async Task DeleteTopicAsync(string topic)
+ {
+ await Instance.DeleteTopicsAsync(new[] { topic });
+ }
+
+ ///
+ /// 获取Kafka主题列表
+ ///
+ ///
+ public async Task> ListTopicsAsync()
+ {
+ var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
+ return new List(metadata.Topics.Select(t => t.Topic));
+ }
+
+ ///
+ /// 判断Kafka主题是否存在
+ ///
+ ///
+ ///
+ public async Task TopicExistsAsync(string topic)
+ {
+ var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10));
+ return metadata.Topics.Any(t => t.Topic == topic);
+ }
+
+ ///
+ /// 检测分区是否存在
+ ///
+ ///
+ ///
+ ///
+ public Dictionary CheckPartitionsExists(string topic, int[] partitions)
+ {
+ var result = new Dictionary();
+ var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
+ if (metadata.Topics.Count == 0)
+ return partitions.ToDictionary(p => p, p => false);
+ var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet();
+ return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p));
+ }
+
+ ///
+ /// 检测分区是否存在
+ ///
+ ///
+ ///
+ ///
+ public bool CheckPartitionsExist(string topic, int targetPartition)
+ {
+ var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
+ if (metadata.Topics.Count == 0)
+ return false;
+ var partitions = metadata.Topics[0].Partitions;
+ return partitions.Any(p => p.PartitionId == targetPartition);
+ }
+
+ ///
+ /// 获取主题的分区数量
+ ///
+ ///
+ ///
+ public int GetTopicPartitionsNum(string topic)
+ {
+ var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10));
+ if (metadata.Topics.Count == 0)
+ return 0;
+ return metadata.Topics[0].Partitions.Count;
+ }
+
+ public void Dispose()
+ {
+ Instance?.Dispose();
+ }
+
+ ///
+ /// Gets the instance.
+ ///
+ /// The configuration.
+ ///
+ public IAdminClient GetInstance(IConfiguration configuration)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]);
+ var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!);
+ var adminClientConfig = new AdminClientConfig
+ {
+ BootstrapServers = configuration["Kafka:BootstrapServers"]
+ };
+ if (enableAuthorization)
+ {
+ adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext;
+ adminClientConfig.SaslMechanism = SaslMechanism.Plain;
+ adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"];
+ adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"];
+ }
+ return new AdminClientBuilder(adminClientConfig).Build();
+ }
+
+ ///
+ /// Checks the topic asynchronous.
+ ///
+ /// The topic.
+ ///
+ public async Task CheckTopicAsync(string topic)
+ {
+ var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5));
+ return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
+ }
+
+ ///
+ /// 判断Kafka主题是否存在
+ ///
+ /// 主题名称
+ /// 副本数量,不能高于Brokers数量
+ ///
+ public async Task CheckTopicAsync(string topic, int numPartitions)
+ {
+ var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5));
+ if (numPartitions > metadata.Brokers.Count)
+ throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。");
+
+ return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic));
+ }
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs
index 598caf0..3b2cfae 100644
--- a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs
+++ b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs
@@ -1,68 +1,60 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+namespace JiShe.CollectBus.Kafka.Attributes;
-namespace JiShe.CollectBus.Kafka.Attributes
+[AttributeUsage(AttributeTargets.Method)]
+public class KafkaSubscribeAttribute : Attribute
{
- [AttributeUsage(AttributeTargets.Method)]
- public class KafkaSubscribeAttribute : Attribute
+ ///
+ /// 订阅主题
+ ///
+ ///
+ public KafkaSubscribeAttribute(string topic)
{
- ///
- /// 订阅的主题
- ///
- public string Topic { get; set; } = null!;
-
- ///
- /// 分区
- ///
- public int Partition { get; set; } = -1;
-
- ///
- /// 消费者组
- ///
- public string? GroupId { get; set; } = null;//"default"
-
- ///
- /// 任务数(默认是多少个分区多少个任务)
- /// 如设置订阅指定Partition则任务数始终为1
- ///
- public int TaskCount { get; set; } = -1;
-
- ///
- /// 批量处理数量
- ///
- public int BatchSize { get; set; } = 100;
-
- ///
- /// 是否启用批量处理
- ///
- public bool EnableBatch { get; set; } = false;
-
- ///
- /// 批次超时时间
- /// 格式:("00:05:00")
- ///
- public TimeSpan? BatchTimeout { get; set; }=null;
-
-
- ///
- /// 订阅主题
- ///
- ///
- public KafkaSubscribeAttribute(string topic)
- {
- this.Topic = topic;
- }
-
- ///
- /// 订阅主题
- ///
- public KafkaSubscribeAttribute(string topic, int partition)
- {
- this.Topic = topic;
- this.Partition = partition;
- }
+ Topic = topic;
}
-}
+
+ ///
+ /// 订阅主题
+ ///
+ public KafkaSubscribeAttribute(string topic, int partition)
+ {
+ Topic = topic;
+ Partition = partition;
+ }
+
+ ///
+ /// 订阅的主题
+ ///
+ public string Topic { get; set; } = null!;
+
+ ///
+ /// 分区
+ ///
+ public int Partition { get; set; } = -1;
+
+ ///
+ /// 消费者组
+ ///
+ public string? GroupId { get; set; } = null; //"default"
+
+ ///
+ /// 任务数(默认是多少个分区多少个任务)
+ /// 如设置订阅指定Partition则任务数始终为1
+ ///
+ public int TaskCount { get; set; } = -1;
+
+ ///
+ /// 批量处理数量
+ ///
+ public int BatchSize { get; set; } = 100;
+
+ ///
+ /// 是否启用批量处理
+ ///
+ public bool EnableBatch { get; set; } = false;
+
+ ///
+ /// 批次超时时间
+ /// 格式:("00:05:00")
+ ///
+ public TimeSpan? BatchTimeout { get; set; } = null;
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs b/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs
index 4cb2fff..a023edb 100644
--- a/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs
+++ b/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs
@@ -1,29 +1,22 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+namespace JiShe.CollectBus.Kafka.Attributes;
-namespace JiShe.CollectBus.Kafka.Attributes
+[AttributeUsage(AttributeTargets.Class, Inherited = false)]
+public class TopicAttribute : Attribute
{
- [AttributeUsage(AttributeTargets.Class, Inherited = false)]
- public class TopicAttribute: Attribute
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The name.
+ public TopicAttribute(string name = "Default")
{
- ///
- /// Initializes a new instance of the class.
- ///
- /// The name.
- public TopicAttribute(string name = "Default")
- {
- Name = name;
- }
-
- ///
- /// Gets or sets the name.
- ///
- ///
- /// The name.
- ///
- public string Name { get; set; }
+ Name = name;
}
-}
+
+ ///
+ /// Gets or sets the name.
+ ///
+ ///
+ /// The name.
+ ///
+ public string Name { get; set; }
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
index 5093561..5313833 100644
--- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs
@@ -13,15 +13,18 @@ namespace JiShe.CollectBus.Kafka.Consumer
public class ConsumerService : IConsumerService, IDisposable
{
private readonly ILogger _logger;
- private readonly IConfiguration _configuration;
private readonly ConcurrentDictionary
_consumerStore = new();
private readonly KafkaOptionConfig _kafkaOptionConfig;
private class KafkaConsumer where TKey : notnull where TValue : class { }
- public ConsumerService(IConfiguration configuration, ILogger logger, IOptions kafkaOptionConfig)
+ ///
+ /// ConsumerService
+ ///
+ ///
+ ///
+ public ConsumerService(ILogger logger, IOptions kafkaOptionConfig)
{
- _configuration = configuration;
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
}
@@ -165,10 +168,10 @@ namespace JiShe.CollectBus.Kafka.Consumer
///
/// 订阅消息
///
- ///
///
///
///
+ ///
///
public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class
{
@@ -387,7 +390,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// 消费等待时间
public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class
{
- await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
+ await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout);
}
diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs
index 77901ef..32ade01 100644
--- a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs
@@ -1,46 +1,50 @@
-using Confluent.Kafka;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+namespace JiShe.CollectBus.Kafka.Consumer;
-namespace JiShe.CollectBus.Kafka.Consumer
+public interface IConsumerService
{
- public interface IConsumerService
- {
- Task SubscribeAsync(string topic, Func> messageHandler, string? groupId=null) where TKey : notnull where TValue : class;
+ Task SubscribeAsync(string topic, Func> messageHandler,
+ string? groupId = null) where TKey : notnull where TValue : class;
- ///
- /// 订阅消息
- ///
- ///
- ///
- ///
- ///
- Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class;
+ ///
+ /// 订阅消息
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null)
+ where TValue : class;
- Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TKey : notnull where TValue : class;
+ Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId)
+ where TKey : notnull where TValue : class;
- ///
- /// 订阅消息
- ///
- ///
- ///
- ///
- ///
- ///
- Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class;
+ ///
+ /// 订阅消息
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null)
+ where TValue : class;
- Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
+ Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler,
+ string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null)
+ where TKey : notnull where TValue : class;
- Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class;
+ Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler,
+ string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null)
+ where TKey : notnull where TValue : class;
- Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
+ Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler,
+ string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null)
+ where TValue : class;
- Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class;
+ Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler,
+ string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null)
+ where TValue : class;
- void Unsubscribe() where TKey : notnull where TValue : class;
- }
-}
+ void Unsubscribe() where TKey : notnull where TValue : class;
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs
index 7b479fa..8bcdcdc 100644
--- a/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs
+++ b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs
@@ -1,30 +1,22 @@
using Confluent.Kafka;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-namespace JiShe.CollectBus.Kafka.Internal
+namespace JiShe.CollectBus.Kafka.Internal;
+
+///
+/// 消息头过滤器
+///
+public class HeadersFilter : Dictionary
{
///
- /// 消息头过滤器
+ /// 判断Headers是否匹配
///
- public class HeadersFilter : Dictionary
+ ///
+ ///
+ public bool Match(Headers headers)
{
- ///
- /// 判断Headers是否匹配
- ///
- ///
- ///
- public bool Match(Headers headers)
- {
- foreach (var kvp in this)
- {
- if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value))
- return false;
- }
- return true;
- }
+ foreach (var kvp in this)
+ if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value))
+ return false;
+ return true;
}
-}
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs
index 0ad450f..5345f1b 100644
--- a/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs
+++ b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs
@@ -1,18 +1,11 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+namespace JiShe.CollectBus.Kafka.Internal;
-namespace JiShe.CollectBus.Kafka.Internal
+///
+/// Kafka订阅者
+///
+/// 订阅者需要继承此接口并需要依赖注入,并使用标记
+///
+///
+public interface IKafkaSubscribe
{
- ///
- /// Kafka订阅者
- ///
- /// 订阅者需要继承此接口并需要依赖注入,并使用标记
- ///
- ///
- public interface IKafkaSubscribe
- {
- }
-}
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs
index ce24d69..27512f7 100644
--- a/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs
+++ b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs
@@ -1,21 +1,14 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+namespace JiShe.CollectBus.Kafka.Internal;
-namespace JiShe.CollectBus.Kafka.Internal
+public interface ISubscribeAck
{
- public interface ISubscribeAck
- {
- ///
- /// 是否成功标记
- ///
- bool Ack { get; set; }
+ ///
+ /// 是否成功标记
+ ///
+ bool Ack { get; set; }
- ///
- /// 消息
- ///
- string? Msg { get; set; }
- }
-}
+ ///
+ /// 消息
+ ///
+ string? Msg { get; set; }
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs
index a3fb126..5f7bdf1 100644
--- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs
+++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs
@@ -1,68 +1,61 @@
using Confluent.Kafka;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-namespace JiShe.CollectBus.Kafka.Internal
+namespace JiShe.CollectBus.Kafka.Internal;
+
+public class KafkaOptionConfig
{
- public class KafkaOptionConfig
- {
- ///
- /// kafka地址
- ///
- public string BootstrapServers { get; set; } = null!;
+ ///
+ /// kafka地址
+ ///
+ public string BootstrapServers { get; set; } = null!;
- ///
- /// 服务器标识
- ///
- public string ServerTagName { get; set; }= "KafkaFilterKey";
+ ///
+ /// 服务器标识
+ ///
+ public string ServerTagName { get; set; } = "KafkaFilterKey";
- ///
- /// kafka主题副本数量
- ///
- public short KafkaReplicationFactor { get; set; }
+ ///
+ /// kafka主题副本数量
+ ///
+ public short KafkaReplicationFactor { get; set; }
- ///
- /// kafka主题分区数量
- ///
- public int NumPartitions { get; set; }
+ ///
+ /// kafka主题分区数量
+ ///
+ public int NumPartitions { get; set; }
- ///
- /// 是否开启过滤器
- ///
- public bool EnableFilter { get; set; }= true;
+ ///
+ /// 是否开启过滤器
+ ///
+ public bool EnableFilter { get; set; } = true;
- ///
- /// 是否开启认证
- ///
- public bool EnableAuthorization { get; set; } = false;
+ ///
+ /// 是否开启认证
+ ///
+ public bool EnableAuthorization { get; set; } = false;
- ///
- /// 安全协议
- ///
- public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.SaslPlaintext;
+ ///
+ /// 安全协议
+ ///
+ public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.SaslPlaintext;
- ///
- /// 认证方式
- ///
- public SaslMechanism SaslMechanism { get; set; }= SaslMechanism.Plain;
+ ///
+ /// 认证方式
+ ///
+ public SaslMechanism SaslMechanism { get; set; } = SaslMechanism.Plain;
- ///
- /// 用户名
- ///
- public string? SaslUserName { get; set; }
+ ///
+ /// 用户名
+ ///
+ public string? SaslUserName { get; set; }
- ///
- /// 密码
- ///
- public string? SaslPassword { get; set; }
+ ///
+ /// 密码
+ ///
+ public string? SaslPassword { get; set; }
- ///
- /// 首次采集时间
- ///
- public DateTime FirstCollectionTime { get; set; }
-
- }
-}
+ ///
+ /// 首次采集时间
+ ///
+ public DateTime FirstCollectionTime { get; set; }
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs
index ded11d7..a39db81 100644
--- a/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs
+++ b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs
@@ -1,113 +1,103 @@
-using Newtonsoft.Json;
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.Linq;
+using System.Collections;
using System.Reflection;
-using System.Text;
-using System.Threading.Tasks;
-namespace JiShe.CollectBus.Kafka.Internal
+namespace JiShe.CollectBus.Kafka.Internal;
+
+///
+/// 反射辅助类
+///
+public static class ReflectionHelper
{
///
- /// 反射辅助类
+ /// 集合类型
+ /// Item1:参数类型
+ /// Item2:集合元素类型
///
- public static class ReflectionHelper
+ public static Tuple GetParameterTypeInfo(this MethodInfo method, int parameterIndex = 0)
{
- ///
- ///集合类型
- ///Item1:参数类型
- ///Item2:集合元素类型
- ///
- public static Tuple GetParameterTypeInfo(this MethodInfo method, int parameterIndex=0)
- {
- // 参数校验
- if (method == null) throw new ArgumentNullException(nameof(method));
- var parameters = method.GetParameters();
- if (parameterIndex < 0 || parameterIndex >= parameters.Length)
- throw new ArgumentOutOfRangeException(nameof(parameterIndex));
+ // 参数校验
+ if (method == null) throw new ArgumentNullException(nameof(method));
+ var parameters = method.GetParameters();
+ if (parameterIndex < 0 || parameterIndex >= parameters.Length)
+ throw new ArgumentOutOfRangeException(nameof(parameterIndex));
- ParameterInfo param = parameters[parameterIndex];
- Type paramType = param.ParameterType;
- Type? elementType = null;
+ var param = parameters[parameterIndex];
+ var paramType = param.ParameterType;
+ Type? elementType = null;
- // 判断是否是集合类型(排除字符串)
- if (paramType != typeof(string) && IsEnumerableType(paramType))
- {
- elementType = GetEnumerableElementType(paramType);
- }
+ // 判断是否是集合类型(排除字符串)
+ if (paramType != typeof(string) && IsEnumerableType(paramType))
+ elementType = GetEnumerableElementType(paramType);
- return Tuple.Create(paramType, elementType);
-
- }
+ return Tuple.Create(paramType, elementType);
+ }
- ///
- /// 判断是否是集合类型(排除字符串)
- ///
- public static bool IsEnumerableType(this Type type)
- {
- return type.IsArray
- || (type.IsGenericType && type.GetInterfaces()
- .Any(t => t.IsGenericType
- && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))
- || type.GetInterfaces().Any(t => t == typeof(System.Collections.IEnumerable));
- }
+ ///
+ /// 判断是否是集合类型(排除字符串)
+ ///
+ public static bool IsEnumerableType(this Type type)
+ {
+ return type.IsArray
+ || (type.IsGenericType && type.GetInterfaces()
+ .Any(t => t.IsGenericType
+ && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)))
+ || type.GetInterfaces().Any(t => t == typeof(IEnumerable));
+ }
- ///
- /// 获取集合元素的类型
- ///
- public static Type? GetEnumerableElementType(this Type type)
- {
- // 处理数组类型
- if (type.IsArray)
- return type.GetElementType();
+ ///
+ /// 获取集合元素的类型
+ ///
+ public static Type? GetEnumerableElementType(this Type type)
+ {
+ // 处理数组类型
+ if (type.IsArray)
+ return type.GetElementType();
- // 处理直接实现IEnumerable的类型(如IEnumerable本身)
- if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IEnumerable<>))
- return type.GetGenericArguments()[0];
+ // 处理直接实现IEnumerable的类型(如IEnumerable本身)
+ if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IEnumerable<>))
+ return type.GetGenericArguments()[0];
- // 处理通过接口实现IEnumerable的泛型集合(如List)
- var genericEnumerable = type.GetInterfaces()
- .FirstOrDefault(t => t.IsGenericType
- && t.GetGenericTypeDefinition() == typeof(IEnumerable<>));
- if (genericEnumerable != null)
- return genericEnumerable.GetGenericArguments()[0];
+ // 处理通过接口实现IEnumerable的泛型集合(如List)
+ var genericEnumerable = type.GetInterfaces()
+ .FirstOrDefault(t => t.IsGenericType
+ && t.GetGenericTypeDefinition() == typeof(IEnumerable<>));
+ if (genericEnumerable != null)
+ return genericEnumerable.GetGenericArguments()[0];
- // 处理非泛型集合类型(如 ArrayList)
- if (typeof(IEnumerable).IsAssignableFrom(type) && type == typeof(ArrayList))
- return typeof(ArrayList);
- // 返回null表示无法确定元素类型
- return null;
- }
+ // 处理非泛型集合类型(如 ArrayList)
+ if (typeof(IEnumerable).IsAssignableFrom(type) && type == typeof(ArrayList))
+ return typeof(ArrayList);
+ // 返回null表示无法确定元素类型
+ return null;
+ }
- //
- /// 判断是否使用强转换
- ///
- /// 目标类型
- ///
- public static bool IsConvertType(this Type targetType)
- {
- // 处理可空类型
- Type underlyingType = Nullable.GetUnderlyingType(targetType) ?? targetType;
- // 情况1:值类型或基元类型(如 int、DateTime)
- if (underlyingType.IsValueType || underlyingType.IsPrimitive)
- return true;
- // 情况2:字符串类型直接赋值
- else if (underlyingType == typeof(string))
- return true;
-
- // 情况3:枚举类型处理
- //else if (underlyingType.IsEnum)
- //{
- // if (Enum.IsDefined(underlyingType, msg))
- // {
- // convertedValue = Enum.Parse(underlyingType, msg.ToString());
- // return true;
- // }
- // return false;
- //}
- return false;
- }
+ ///
+ /// 判断是否使用强转换
+ ///
+ ///
+ ///
+ public static bool IsConvertType(this Type targetType)
+ {
+ // 处理可空类型
+ var underlyingType = Nullable.GetUnderlyingType(targetType) ?? targetType;
+ // 情况1:值类型或基元类型(如 int、DateTime)
+ if (underlyingType.IsValueType || underlyingType.IsPrimitive)
+ return true;
+ // 情况2:字符串类型直接赋值
+ if (underlyingType == typeof(string))
+ return true;
+
+ // 情况3:枚举类型处理
+ //else if (underlyingType.IsEnum)
+ //{
+ // if (Enum.IsDefined(underlyingType, msg))
+ // {
+ // convertedValue = Enum.Parse(underlyingType, msg.ToString());
+ // return true;
+ // }
+ // return false;
+ //}
+ return false;
}
}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs
index 0c016ff..d3fbcf6 100644
--- a/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs
+++ b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs
@@ -1,75 +1,62 @@
-using Confluent.Kafka;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using static System.Runtime.InteropServices.JavaScript.JSType;
+namespace JiShe.CollectBus.Kafka.Internal;
-namespace JiShe.CollectBus.Kafka.Internal
+public class SubscribeResult : ISubscribeAck
{
- public class SubscribeResult: ISubscribeAck
+ ///
+ /// 是否成功
+ ///
+ public bool Ack { get; set; }
+
+ ///
+ /// 消息
+ ///
+ public string? Msg { get; set; }
+
+
+ ///
+ /// 成功
+ ///
+ /// 消息
+ public SubscribeResult Success(string? msg = null)
{
- ///
- /// 是否成功
- ///
- public bool Ack { get; set; }
-
- ///
- /// 消息
- ///
- public string? Msg { get; set; }
-
-
- ///
- /// 成功
- ///
- /// 消息
- public SubscribeResult Success(string? msg = null)
- {
- Ack = true;
- Msg = msg;
- return this;
- }
-
- ///
- /// 失败
- ///
- ///
- ///
- ///
- ///
- public SubscribeResult Fail(string? msg = null)
- {
- Msg = msg;
- Ack = false;
- return this;
- }
+ Ack = true;
+ Msg = msg;
+ return this;
}
- public static partial class SubscribeAck
+ ///
+ /// 失败
+ ///
+ ///
+ ///
+ public SubscribeResult Fail(string? msg = null)
{
-
- ///
- /// 成功
- ///
- /// 消息
- ///
- public static ISubscribeAck Success(string? msg = null)
- {
- return new SubscribeResult().Success(msg);
- }
-
-
- ///
- /// 失败
- ///
- /// 消息
- ///
- public static ISubscribeAck Fail(string? msg = null)
- {
- return new SubscribeResult().Fail(msg);
- }
+ Msg = msg;
+ Ack = false;
+ return this;
}
-
}
+
+public static class SubscribeAck
+{
+ ///
+ /// 成功
+ ///
+ /// 消息
+ ///
+ public static ISubscribeAck Success(string? msg = null)
+ {
+ return new SubscribeResult().Success(msg);
+ }
+
+
+ ///
+ /// 失败
+ ///
+ /// 消息
+ ///
+ public static ISubscribeAck Fail(string? msg = null)
+ {
+ return new SubscribeResult().Fail(msg);
+ }
+}
\ No newline at end of file
diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
similarity index 93%
rename from modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs
rename to modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
index a5bf39c..64f8dec 100644
--- a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs
+++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs
@@ -8,26 +8,17 @@ using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
using Microsoft.AspNetCore.Builder;
-using Microsoft.AspNetCore.Mvc.Abstractions;
-using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
-using System;
using System.Collections;
-using System.Collections.Generic;
using System.ComponentModel;
-using System.Linq.Expressions;
using System.Reflection;
-using System.Text.Json;
-using System.Threading.Tasks;
-using YamlDotNet.Core.Tokens;
-using static System.Runtime.InteropServices.JavaScript.JSType;
namespace JiShe.CollectBus.Kafka
{
- public static class KafkaSubcribesExtensions
+ public static class KafkaSubscribeExtensions
{
public static void UseInitKafkaTopic(this IServiceProvider provider)
@@ -36,7 +27,7 @@ namespace JiShe.CollectBus.Kafka
var kafkaAdminClient = provider.GetRequiredService();
var kafkaOptions = provider.GetRequiredService>();
- List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
+ var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
@@ -48,8 +39,6 @@ namespace JiShe.CollectBus.Kafka
///
/// 添加Kafka订阅
///
- ///
- ///
public static void UseKafkaSubscribe(this IServiceProvider provider)
{
var lifetime = provider.GetRequiredService();
@@ -57,8 +46,8 @@ namespace JiShe.CollectBus.Kafka
lifetime.ApplicationStarted.Register(() =>
{
var logger = provider.GetRequiredService>();
- int threadCount = 0;
- int topicCount = 0;
+ var threadCount = 0;
+ var topicCount = 0;
var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location);
if (string.IsNullOrWhiteSpace(assemblyPath))
{
@@ -98,6 +87,9 @@ namespace JiShe.CollectBus.Kafka
});
}
+ ///
+ /// 添加Kafka订阅
+ ///
public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly)
{
var provider = app.ApplicationServices;
@@ -134,8 +126,6 @@ namespace JiShe.CollectBus.Kafka
///
/// 构建Kafka订阅
///
- ///
- ///
private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider, ILogger logger, KafkaOptionConfig kafkaOptionConfig)
{
var subscribedMethods = subscribe.GetType().GetMethods()
@@ -169,11 +159,6 @@ namespace JiShe.CollectBus.Kafka
///
/// 启动后台消费线程
///
- ///
- ///
- ///
- ///
- ///
private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger logger)
{
var consumerService = provider.GetRequiredService();
@@ -225,10 +210,6 @@ namespace JiShe.CollectBus.Kafka
///
/// 处理消息
///
- ///
- ///
- ///
- ///
private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe)
{
var parameters = method.GetParameters();
@@ -351,9 +332,6 @@ namespace JiShe.CollectBus.Kafka
}
return false;
}
-
-
-
}
diff --git a/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs
index becea90..8cbe665 100644
--- a/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs
@@ -1,9 +1,4 @@
using Confluent.Kafka;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
namespace JiShe.CollectBus.Kafka.Producer
{
diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
index db0efd8..16499b5 100644
--- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
+++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs
@@ -23,6 +23,13 @@ namespace JiShe.CollectBus.Kafka.Producer
private readonly ConcurrentDictionary _producerCache = new();
private class KafkaProducer where TKey : notnull where TValue : class { }
private readonly KafkaOptionConfig _kafkaOptionConfig;
+
+ ///
+ /// ProducerService
+ ///
+ ///
+ ///
+ ///
public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig)
{
_configuration = configuration;
diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
index 1874b25..988c9df 100644
--- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
+++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs
@@ -41,7 +41,7 @@ namespace JiShe.CollectBus;
typeof(CollectBusFreeRedisModule),
typeof(CollectBusFreeSqlModule),
typeof(CollectBusKafkaModule),
- typeof(CollectBusIoTDBModule),
+ typeof(CollectBusIoTDbModule),
typeof(CollectBusCassandraModule)
)]
public class CollectBusApplicationModule : AbpModule
diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index f915fd6..2f22697 100644
--- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -34,13 +34,13 @@ namespace JiShe.CollectBus.Samples;
public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaSubscribe
{
private readonly ILogger _logger;
- private readonly IIoTDBProvider _iotDBProvider;
- private readonly IoTDBRuntimeContext _dbContext;
- private readonly IoTDBOptions _options;
+ private readonly IIoTDbProvider _iotDBProvider;
+ private readonly IoTDbRuntimeContext _dbContext;
+ private readonly IoTDbOptions _options;
private readonly IRedisDataCacheService _redisDataCacheService;
- public SampleAppService(IIoTDBProvider iotDBProvider, IOptions options,
- IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService)
+ public SampleAppService(IIoTDbProvider iotDBProvider, IOptions options,
+ IoTDbRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService)
{
_iotDBProvider = iotDBProvider;
_options = options.Value;
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index a7f9f3c..4ff5dd6 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -37,7 +37,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger _logger;
- private readonly IIoTDBProvider _dbProvider;
+ private readonly IIoTDbProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService;
private readonly IRedisDataCacheService _redisDataCacheService;
@@ -48,7 +48,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService,
IRedisDataCacheService redisDataCacheService,
- IIoTDBProvider dbProvider,
+ IIoTDbProvider dbProvider,
IOptions kafkaOptions)
{
_logger = logger;
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index fe398a5..b8fd08b 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -37,7 +37,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService(
ILogger logger,
- IIoTDBProvider dbProvider,
+ IIoTDbProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordRepository,
IOptions kafkaOptions,
IProducerService producerService,
diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
index 2ac8801..8db1072 100644
--- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
+++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs
@@ -31,7 +31,7 @@ namespace JiShe.CollectBus.Subscribers
private readonly IRepository _messageReceivedLoginEventRepository;
private readonly IRepository _messageReceivedHeartbeatEventRepository;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
- private readonly IIoTDBProvider _dbProvider;
+ private readonly IIoTDbProvider _dbProvider;
///
/// Initializes a new instance of the class.
@@ -47,7 +47,7 @@ namespace JiShe.CollectBus.Subscribers
IServiceProvider serviceProvider,
IRepository messageReceivedLoginEventRepository,
IRepository messageReceivedHeartbeatEventRepository,
- IIoTDBProvider dbProvider,
+ IIoTDbProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordsRepository)
{
_logger = logger;