diff --git a/modules/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs b/modules/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs deleted file mode 100644 index 0ea1b56..0000000 --- a/modules/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs +++ /dev/null @@ -1,156 +0,0 @@ -using System.Collections.Concurrent; -using Cassandra; -using Cassandra.Mapping; -using Microsoft.Extensions.Caching.Memory; -using Microsoft.Extensions.Logging; - -namespace JiShe.CollectBus.Cassandra -{ - public class CassandraQueryOptimizer - { - private readonly ISession _session; - private readonly ILogger _logger; - private readonly IMemoryCache _cache; - private readonly ConcurrentDictionary _preparedStatements; - private readonly int _batchSize; - private readonly TimeSpan _cacheExpiration; - - public CassandraQueryOptimizer( - ISession session, - ILogger logger, - IMemoryCache cache, - int batchSize = 100, - TimeSpan? cacheExpiration = null) - { - _session = session; - _logger = logger; - _cache = cache; - _preparedStatements = new ConcurrentDictionary(); - _batchSize = batchSize; - _cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5); - } - - public async Task GetOrPrepareStatementAsync(string cql) - { - return _preparedStatements.GetOrAdd(cql, key => - { - try - { - var statement = _session.Prepare(key); - _logger.LogDebug($"Prepared statement for CQL: {key}"); - return statement; - } - catch (Exception ex) - { - _logger.LogError(ex, $"Failed to prepare statement for CQL: {key}"); - throw; - } - }); - } - - public async Task ExecuteBatchAsync(IEnumerable statements) - { - var batch = new BatchStatement(); - var count = 0; - - foreach (var statement in statements) - { - batch.Add(statement); - count++; - - if (count >= _batchSize) - { - await ExecuteBatchAsync(batch); - batch = new BatchStatement(); - count = 0; - } - } - - if (count > 0) - { - await ExecuteBatchAsync(batch); - } - } - - private async Task ExecuteBatchAsync(BatchStatement batch) - { - try - { - await _session.ExecuteAsync(batch); - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to execute batch statement"); - throw; - } - } - - public async Task GetOrSetFromCacheAsync(string cacheKey, Func> getData) - { - if (_cache.TryGetValue(cacheKey, out T cachedValue)) - { - _logger.LogDebug($"Cache hit for key: {cacheKey}"); - return cachedValue; - } - - var data = await getData(); - _cache.Set(cacheKey, data, _cacheExpiration); - _logger.LogDebug($"Cache miss for key: {cacheKey}, data cached"); - return data; - } - - public async Task> ExecutePagedQueryAsync( - string cql, - object[] parameters, - int pageSize = 100, - string pagingState = null) where T : class - { - var statement = await GetOrPrepareStatementAsync(cql); - var boundStatement = statement.Bind(parameters); - - if (!string.IsNullOrEmpty(pagingState)) - { - boundStatement.SetPagingState(Convert.FromBase64String(pagingState)); - } - - boundStatement.SetPageSize(pageSize); - - try - { - var result = await _session.ExecuteAsync(boundStatement); - //TODO: RETURN OBJECT - throw new NotImplementedException(); - //result.GetRows() - //return result.Select(row => row); - } - catch (Exception ex) - { - _logger.LogError(ex, $"Failed to execute paged query: {cql}"); - throw; - } - } - - public async Task BulkInsertAsync(IEnumerable items, string tableName) - { - var mapper = new Mapper(_session); - var batch = new List(); - var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})"; - - foreach (var chunk in items.Chunk(_batchSize)) - { - var statements = chunk.Select(item => - { - var props = typeof(T).GetProperties(); - var columns = string.Join(", ", props.Select(p => p.Name)); - var values = string.Join(", ", props.Select(p => "?")); - var statement = _session.Prepare(string.Format(cql, columns, values)); - return statement.Bind(props.Select(p => p.GetValue(item)).ToArray()); - }); - - batch.AddRange(statements); - } - - await ExecuteBatchAsync(batch); - } - } -} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Cassandra/CassandraRepository.cs b/modules/JiShe.CollectBus.Cassandra/CassandraRepository.cs index 25a51e3..1d6cc3c 100644 --- a/modules/JiShe.CollectBus.Cassandra/CassandraRepository.cs +++ b/modules/JiShe.CollectBus.Cassandra/CassandraRepository.cs @@ -1,19 +1,13 @@ -using Cassandra; -using Cassandra.Data.Linq; +using System.Linq.Expressions; using Cassandra.Mapping; using JiShe.CollectBus.Cassandra.Extensions; -using JiShe.CollectBus.Common.Attributes; -using Microsoft.AspNetCore.Http; -using System.Reflection; -using Thrift.Protocol.Entities; using Volo.Abp.Domain.Entities; -using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Cassandra { public class CassandraRepository : ICassandraRepository - where TEntity : class + where TEntity : class, ICassandraEntity { private readonly ICassandraProvider _cassandraProvider; public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig) @@ -27,12 +21,29 @@ namespace JiShe.CollectBus.Cassandra public virtual async Task GetAsync(TKey id) { - return await Mapper.SingleOrDefaultAsync("WHERE id = ?", id); + return await GetAsync("WHERE id = ?", id); } - public virtual async Task> GetListAsync() + public virtual async Task GetAsync(string cql, params object[] args) { - return (await Mapper.FetchAsync()).ToList(); + return await Mapper.SingleAsync(cql, args); + } + + + public virtual async Task FirstOrDefaultAsync(TKey id) + { + return await FirstOrDefaultAsync("WHERE id = ?", id); + } + + public virtual async Task FirstOrDefaultAsync(string cql, params object[] args) + { + return await Mapper.FirstOrDefaultAsync(cql, args); + } + + + public virtual async Task?> GetListAsync(string? cql = null, params object[] args) + { + return cql.IsNullOrWhiteSpace() ? (await Mapper.FetchAsync()).ToList() : (await Mapper.FetchAsync(cql, args)).ToList(); } public virtual async Task InsertAsync(TEntity entity) diff --git a/modules/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs b/modules/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs index b5274f7..bf57bce 100644 --- a/modules/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs +++ b/modules/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs @@ -1,7 +1,4 @@ -using Cassandra; -using Cassandra.Mapping; -using JiShe.CollectBus.Cassandra.Mappers; -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Volo.Abp; using Volo.Abp.Autofac; using Volo.Abp.Modularity; diff --git a/modules/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs b/modules/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs index fe69268..c9ec0de 100644 --- a/modules/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs +++ b/modules/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs @@ -1,10 +1,4 @@ -using Autofac.Core; -using Cassandra; -using Cassandra.Mapping; -using JiShe.CollectBus.Cassandra; -using JiShe.CollectBus.Cassandra.Mappers; -using Microsoft.Extensions.Options; -using System.Reflection; +using JiShe.CollectBus.Cassandra; using Volo.Abp; using Volo.Abp.Modularity; @@ -26,8 +20,6 @@ namespace Microsoft.Extensions.DependencyInjection public static void AddCassandra(this ServiceConfigurationContext context) { context.Services.AddTransient(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>)); - context.Services.AddSingleton(new MappingConfiguration() - .Define(new CollectBusMapping())); } } } diff --git a/modules/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs b/modules/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs index c313d0c..dd0ff66 100644 --- a/modules/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs +++ b/modules/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs @@ -3,9 +3,7 @@ using System.Text; using Cassandra; using System.ComponentModel.DataAnnotations; using JiShe.CollectBus.Common.Attributes; -using Cassandra.Mapping; -using Cassandra.Data.Linq; -using Thrift.Protocol.Entities; +using Volo.Abp.Data; namespace JiShe.CollectBus.Cassandra.Extensions { @@ -16,17 +14,26 @@ namespace JiShe.CollectBus.Cassandra.Extensions var type = typeof(TEntity); var tableAttribute = type.GetCustomAttribute(); var tableName = tableAttribute?.Name ?? type.Name.ToLower(); - //var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace; var tableKeyspace = session.Keyspace; var properties = type.GetProperties(); - var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute() != null); + + // 分区键设置 + var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute() != null); if (primaryKey == null) { throw new InvalidOperationException($"No primary key defined for type {type.Name}"); } + // 集群键设置 + var clusteringKeys = properties.Where(p => p.GetCustomAttribute() != null).Select(a=>a.Name).ToList(); + var clusteringKeyCql = string.Empty; + if (clusteringKeys.Any()) + { + clusteringKeyCql = $", {string.Join(", ", clusteringKeys)}"; + } + var cql = new StringBuilder(); cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} ("); @@ -40,7 +47,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions cql.Append($"{columnName} {cqlType}, "); } cql.Length -= 2; // Remove last comma and space - cql.Append($", PRIMARY KEY ({primaryKey.Name.ToLower()}))"); + cql.Append($", PRIMARY KEY (({primaryKey.Name.ToLower()}){clusteringKeyCql}))"); session.Execute(cql.ToString()); } @@ -61,6 +68,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions if (type == typeof(Guid)) return "uuid"; if (type == typeof(DateTimeOffset)) return "timestamp"; if (type == typeof(Byte[])) return "blob"; + if (type == typeof(ExtraPropertyDictionary)) return "map"; // 处理集合类型 if (type.IsGenericType) @@ -72,6 +80,8 @@ namespace JiShe.CollectBus.Cassandra.Extensions return $"list<{GetCassandraType(elementType)}>"; if (genericType == typeof(HashSet<>)) return $"set<{GetCassandraType(elementType)}>"; + if (genericType == typeof(Nullable<>)) + return GetCassandraType(elementType); if (genericType == typeof(Dictionary<,>)) { var keyType = type.GetGenericArguments()[0]; diff --git a/modules/JiShe.CollectBus.Cassandra/ICassandraRepository.cs b/modules/JiShe.CollectBus.Cassandra/ICassandraRepository.cs index 6dd474f..5f3b862 100644 --- a/modules/JiShe.CollectBus.Cassandra/ICassandraRepository.cs +++ b/modules/JiShe.CollectBus.Cassandra/ICassandraRepository.cs @@ -10,7 +10,10 @@ namespace JiShe.CollectBus.Cassandra public interface ICassandraRepository where TEntity : class { Task GetAsync(TKey id); - Task> GetListAsync(); + Task GetAsync(string cql, params object[] args); + Task FirstOrDefaultAsync(TKey id); + Task FirstOrDefaultAsync(string cql, params object[] args); + Task?> GetListAsync(string? cql = null, params object[] args); Task InsertAsync(TEntity entity); Task UpdateAsync(TEntity entity); Task DeleteAsync(TEntity entity); diff --git a/modules/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj b/modules/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj index edc303c..a3360e3 100644 --- a/modules/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj +++ b/modules/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj @@ -15,8 +15,8 @@ - + diff --git a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs index 93bbbfe..2cce113 100644 --- a/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs +++ b/modules/JiShe.CollectBus.IoTDB/CollectBusIoTDBModule.cs @@ -1,33 +1,22 @@ using JiShe.CollectBus.IoTDB.Context; -using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; -using JiShe.CollectBus.IoTDB.Provider; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Volo.Abp.Modularity; -namespace JiShe.CollectBus.IoTDB +namespace JiShe.CollectBus.IoTDB; + +/// +/// CollectBusIoTDBModule +/// +public class CollectBusIoTDbModule : AbpModule { - public class CollectBusIoTDBModule : AbpModule + public override void ConfigureServices(ServiceConfigurationContext context) { - public override void ConfigureServices(ServiceConfigurationContext context) - { - - var configuration = context.Services.GetConfiguration(); - Configure(options => - { - configuration.GetSection(nameof(IoTDBOptions)).Bind(options); - }); + var configuration = context.Services.GetConfiguration(); + Configure(options => { configuration.GetSection(nameof(IoTDbOptions)).Bind(options); }); - // 注册上下文为Scoped - context.Services.AddScoped(); - - // 注册Session工厂 - context.Services.AddSingleton(); - - // 注册Provider - context.Services.AddScoped(); - - } + // 注册上下文为Scoped + context.Services.AddScoped(); } -} +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs index f321fa0..afad488 100644 --- a/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs +++ b/modules/JiShe.CollectBus.IoTDB/Context/IoTDBRuntimeContext.cs @@ -6,11 +6,11 @@ namespace JiShe.CollectBus.IoTDB.Context /// /// IoTDB SessionPool 运行时上下文 /// - public class IoTDBRuntimeContext + public class IoTDbRuntimeContext { private readonly bool _defaultValue; - public IoTDBRuntimeContext(IOptions options) + public IoTDbRuntimeContext(IOptions options) { _defaultValue = options.Value.UseTableSessionPoolByDefault; UseTableSessionPool = _defaultValue; diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs index bb47841..02ac3ee 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBProvider.cs @@ -7,7 +7,7 @@ namespace JiShe.CollectBus.IoTDB.Interface /// /// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置 /// - public interface IIoTDBProvider + public interface IIoTDbProvider { ///// ///// 切换 SessionPool diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs index 03cd4a6..c2337ea 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionFactory.cs @@ -3,8 +3,8 @@ /// /// Session 工厂接口 /// - public interface IIoTDBSessionFactory:IDisposable + public interface IIoTDbSessionFactory:IDisposable { - IIoTDBSessionPool GetSessionPool(bool useTableSession); + IIoTDbSessionPool GetSessionPool(bool useTableSession); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs index 026a83a..9587549 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs @@ -5,7 +5,7 @@ namespace JiShe.CollectBus.IoTDB.Interface /// /// Session 连接池 /// - public interface IIoTDBSessionPool : IDisposable + public interface IIoTDbSessionPool : IDisposable { /// /// 打开连接池 diff --git a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs index c9c0610..69463ba 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs @@ -3,7 +3,7 @@ /// /// IOTDB配置 /// - public class IoTDBOptions + public class IoTDbOptions { /// /// 数据库名称,表模型才有,树模型为空 diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 5b4e2a7..fdb9789 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -17,20 +17,26 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// IoTDB数据源 /// - public class IoTDBProvider : IIoTDBProvider + public class IoTDbProvider : IIoTDbProvider, IScopedDependency { - private static readonly ConcurrentDictionary _metadataCache = new(); - private readonly ILogger _logger; - private readonly IIoTDBSessionFactory _sessionFactory; - private readonly IoTDBRuntimeContext _runtimeContext; + private static readonly ConcurrentDictionary MetadataCache = new(); + private readonly ILogger _logger; + private readonly IIoTDbSessionFactory _sessionFactory; + private readonly IoTDbRuntimeContext _runtimeContext; - private IIoTDBSessionPool CurrentSession => + private IIoTDbSessionPool CurrentSession => _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); - public IoTDBProvider( - ILogger logger, - IIoTDBSessionFactory sessionFactory, - IoTDBRuntimeContext runtimeContext) + /// + /// IoTDbProvider + /// + /// + /// + /// + public IoTDbProvider( + ILogger logger, + IIoTDbSessionFactory sessionFactory, + IoTDbRuntimeContext runtimeContext) { _logger = logger; _sessionFactory = sessionFactory; diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs index 572e93d..27b9200 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBSessionFactory.cs @@ -2,6 +2,7 @@ using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Options; +using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.IoTDB.Provider { @@ -9,25 +10,29 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 实现带缓存的Session工厂 /// - public class IoTDBSessionFactory : IIoTDBSessionFactory + public class IoTDbSessionFactory : IIoTDbSessionFactory, ISingletonDependency { - private readonly IoTDBOptions _options; - private readonly ConcurrentDictionary _pools = new(); + private readonly IoTDbOptions _options; + private readonly ConcurrentDictionary _pools = new(); private bool _disposed; - public IoTDBSessionFactory(IOptions options) + /// + /// IoTDbSessionFactory + /// + /// + public IoTDbSessionFactory(IOptions options) { _options = options.Value; } - public IIoTDBSessionPool GetSessionPool(bool useTableSession) + public IIoTDbSessionPool GetSessionPool(bool useTableSession) { - if (_disposed) throw new ObjectDisposedException(nameof(IoTDBSessionFactory)); + if (_disposed) throw new ObjectDisposedException(nameof(IoTDbSessionFactory)); return _pools.GetOrAdd(useTableSession, key => { var pool = key - ? (IIoTDBSessionPool)new TableSessionPoolAdapter(_options) + ? (IIoTDbSessionPool)new TableSessionPoolAdapter(_options) : new SessionPoolAdapter(_options); pool.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult(); ; diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs index 44692bd..dd04f60 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs @@ -9,12 +9,16 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 树模型连接池 /// - public class SessionPoolAdapter : IIoTDBSessionPool + public class SessionPoolAdapter : IIoTDbSessionPool { private readonly SessionPool _sessionPool; - private readonly IoTDBOptions _options; + private readonly IoTDbOptions _options; - public SessionPoolAdapter(IoTDBOptions options) + /// + /// SessionPoolAdapter + /// + /// + public SessionPoolAdapter(IoTDbOptions options) { _options = options; _sessionPool = new SessionPool.Builder() diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs index 1efd04f..be42ad7 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs @@ -9,12 +9,16 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// 表模型Session连接池 /// - public class TableSessionPoolAdapter : IIoTDBSessionPool + public class TableSessionPoolAdapter : IIoTDbSessionPool { private readonly TableSessionPool _sessionPool; - private readonly IoTDBOptions _options; + private readonly IoTDbOptions _options; - public TableSessionPoolAdapter(IoTDBOptions options) + /// + /// TableSessionPoolAdapter + /// + /// + public TableSessionPoolAdapter(IoTDbOptions options) { _options = options; _sessionPool = new TableSessionPool.Builder() diff --git a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs index 59e34fa..66643a5 100644 --- a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs +++ b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs @@ -1,204 +1,190 @@ using Confluent.Kafka; -using Microsoft.Extensions.Configuration; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using Confluent.Kafka.Admin; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; -namespace JiShe.CollectBus.Kafka.AdminClient +namespace JiShe.CollectBus.Kafka.AdminClient; + +public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency { - public class AdminClientService : IAdminClientService, IDisposable,ISingletonDependency + private readonly ILogger _logger; + + /// + /// Initializes a new instance of the class. + /// + /// + /// + public AdminClientService(IConfiguration configuration, ILogger logger) { + _logger = logger; + Instance = GetInstance(configuration); + } - private readonly ILogger _logger; + /// + /// Gets or sets the instance. + /// + /// + /// The instance. + /// + public IAdminClient Instance { get; set; } - /// - /// Initializes a new instance of the class. - /// - /// The configuration. - /// The logger. - public AdminClientService(IConfiguration configuration, ILogger logger) + /// + /// 创建Kafka主题 + /// + /// + /// + /// + /// + public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) + { + try { - _logger = logger; - GetInstance(configuration); - } + if (await CheckTopicAsync(topic)) return; - /// - /// Gets or sets the instance. - /// - /// - /// The instance. - /// - public IAdminClient Instance { get; set; } = default; - /// - /// Gets the instance. - /// - /// The configuration. - /// - public IAdminClient GetInstance(IConfiguration configuration) - { - ArgumentNullException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); - var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); - var adminClientConfig = new AdminClientConfig() + await Instance.CreateTopicsAsync(new[] { - BootstrapServers = configuration["Kafka:BootstrapServers"], - }; - if (enableAuthorization) - { - adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; - adminClientConfig.SaslMechanism = SaslMechanism.Plain; - adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"]; - adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"]; - } - Instance = new AdminClientBuilder(adminClientConfig).Build(); - return Instance; - } - - /// - /// Checks the topic asynchronous. - /// - /// The topic. - /// - public async Task CheckTopicAsync(string topic) - { - var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); - return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); - } - - /// - /// 判断Kafka主题是否存在 - /// - /// 主题名称 - /// 副本数量,不能高于Brokers数量 - /// - public async Task CheckTopicAsync(string topic,int numPartitions) - { - var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); - if(numPartitions > metadata.Brokers.Count) - { - throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。") ; - } - - return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); - } - - //// - /// 创建Kafka主题 - /// - /// 主题名称 - /// 主题分区数量 - /// 副本数量,不能高于Brokers数量 - /// - public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) - { - - try - { - if (await CheckTopicAsync(topic)) return; - - - await Instance.CreateTopicsAsync(new[] + new TopicSpecification { - new TopicSpecification - { - Name = topic, - NumPartitions = numPartitions, - ReplicationFactor = replicationFactor - } - }); - } - catch (CreateTopicsException e) - { - if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) - { - throw; + Name = topic, + NumPartitions = numPartitions, + ReplicationFactor = replicationFactor } - } + }); } - - /// - /// 删除Kafka主题 - /// - /// - /// - public async Task DeleteTopicAsync(string topic) + catch (CreateTopicsException e) { - await Instance.DeleteTopicsAsync(new[] { topic }); - } - - /// - /// 获取Kafka主题列表 - /// - /// - public async Task> ListTopicsAsync() - { - var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); - return new List(metadata.Topics.Select(t => t.Topic)); - } - - /// - /// 判断Kafka主题是否存在 - /// - /// - /// - public async Task TopicExistsAsync(string topic) - { - var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); - return metadata.Topics.Any(t => t.Topic == topic); - } - - /// - /// 检测分区是否存在 - /// - /// - /// - /// - public Dictionary CheckPartitionsExists(string topic, int[] partitions) - { - var result = new Dictionary(); - var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); - if (metadata.Topics.Count == 0) - return partitions.ToDictionary(p => p, p => false); - var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet(); - return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p)); - } - - /// - /// 检测分区是否存在 - /// - /// - /// - /// - public bool CheckPartitionsExist(string topic, int targetPartition) - { - var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); - if (metadata.Topics.Count == 0) - return false; - var partitions = metadata.Topics[0].Partitions; - return partitions.Any(p => p.PartitionId == targetPartition); - } - - /// - /// 获取主题的分区数量 - /// - /// - /// - public int GetTopicPartitionsNum(string topic) - { - var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); - if (metadata.Topics.Count == 0) - return 0; - return metadata.Topics[0].Partitions.Count; - } - - public void Dispose() - { - Instance?.Dispose(); + if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) throw; } } -} + + /// + /// 删除Kafka主题 + /// + /// + /// + public async Task DeleteTopicAsync(string topic) + { + await Instance.DeleteTopicsAsync(new[] { topic }); + } + + /// + /// 获取Kafka主题列表 + /// + /// + public async Task> ListTopicsAsync() + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); + return new List(metadata.Topics.Select(t => t.Topic)); + } + + /// + /// 判断Kafka主题是否存在 + /// + /// + /// + public async Task TopicExistsAsync(string topic) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); + return metadata.Topics.Any(t => t.Topic == topic); + } + + /// + /// 检测分区是否存在 + /// + /// + /// + /// + public Dictionary CheckPartitionsExists(string topic, int[] partitions) + { + var result = new Dictionary(); + var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); + if (metadata.Topics.Count == 0) + return partitions.ToDictionary(p => p, p => false); + var existingPartitions = metadata.Topics[0].Partitions.Select(p => p.PartitionId).ToHashSet(); + return partitions.ToDictionary(p => p, p => existingPartitions.Contains(p)); + } + + /// + /// 检测分区是否存在 + /// + /// + /// + /// + public bool CheckPartitionsExist(string topic, int targetPartition) + { + var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); + if (metadata.Topics.Count == 0) + return false; + var partitions = metadata.Topics[0].Partitions; + return partitions.Any(p => p.PartitionId == targetPartition); + } + + /// + /// 获取主题的分区数量 + /// + /// + /// + public int GetTopicPartitionsNum(string topic) + { + var metadata = Instance.GetMetadata(topic, TimeSpan.FromSeconds(10)); + if (metadata.Topics.Count == 0) + return 0; + return metadata.Topics[0].Partitions.Count; + } + + public void Dispose() + { + Instance?.Dispose(); + } + + /// + /// Gets the instance. + /// + /// The configuration. + /// + public IAdminClient GetInstance(IConfiguration configuration) + { + ArgumentException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); + var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); + var adminClientConfig = new AdminClientConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"] + }; + if (enableAuthorization) + { + adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; + adminClientConfig.SaslMechanism = SaslMechanism.Plain; + adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"]; + adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"]; + } + return new AdminClientBuilder(adminClientConfig).Build(); + } + + /// + /// Checks the topic asynchronous. + /// + /// The topic. + /// + public async Task CheckTopicAsync(string topic) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); + return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); + } + + /// + /// 判断Kafka主题是否存在 + /// + /// 主题名称 + /// 副本数量,不能高于Brokers数量 + /// + public async Task CheckTopicAsync(string topic, int numPartitions) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); + if (numPartitions > metadata.Brokers.Count) + throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。"); + + return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); + } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs index 598caf0..3b2cfae 100644 --- a/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs +++ b/modules/JiShe.CollectBus.Kafka/Attributes/KafkaSubscribeAttribute.cs @@ -1,68 +1,60 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Attributes; -namespace JiShe.CollectBus.Kafka.Attributes +[AttributeUsage(AttributeTargets.Method)] +public class KafkaSubscribeAttribute : Attribute { - [AttributeUsage(AttributeTargets.Method)] - public class KafkaSubscribeAttribute : Attribute + /// + /// 订阅主题 + /// + /// + public KafkaSubscribeAttribute(string topic) { - /// - /// 订阅的主题 - /// - public string Topic { get; set; } = null!; - - /// - /// 分区 - /// - public int Partition { get; set; } = -1; - - /// - /// 消费者组 - /// - public string? GroupId { get; set; } = null;//"default" - - /// - /// 任务数(默认是多少个分区多少个任务) - /// 如设置订阅指定Partition则任务数始终为1 - /// - public int TaskCount { get; set; } = -1; - - /// - /// 批量处理数量 - /// - public int BatchSize { get; set; } = 100; - - /// - /// 是否启用批量处理 - /// - public bool EnableBatch { get; set; } = false; - - /// - /// 批次超时时间 - /// 格式:("00:05:00") - /// - public TimeSpan? BatchTimeout { get; set; }=null; - - - /// - /// 订阅主题 - /// - /// - public KafkaSubscribeAttribute(string topic) - { - this.Topic = topic; - } - - /// - /// 订阅主题 - /// - public KafkaSubscribeAttribute(string topic, int partition) - { - this.Topic = topic; - this.Partition = partition; - } + Topic = topic; } -} + + /// + /// 订阅主题 + /// + public KafkaSubscribeAttribute(string topic, int partition) + { + Topic = topic; + Partition = partition; + } + + /// + /// 订阅的主题 + /// + public string Topic { get; set; } = null!; + + /// + /// 分区 + /// + public int Partition { get; set; } = -1; + + /// + /// 消费者组 + /// + public string? GroupId { get; set; } = null; //"default" + + /// + /// 任务数(默认是多少个分区多少个任务) + /// 如设置订阅指定Partition则任务数始终为1 + /// + public int TaskCount { get; set; } = -1; + + /// + /// 批量处理数量 + /// + public int BatchSize { get; set; } = 100; + + /// + /// 是否启用批量处理 + /// + public bool EnableBatch { get; set; } = false; + + /// + /// 批次超时时间 + /// 格式:("00:05:00") + /// + public TimeSpan? BatchTimeout { get; set; } = null; +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs b/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs index 4cb2fff..a023edb 100644 --- a/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs +++ b/modules/JiShe.CollectBus.Kafka/Attributes/TopicAttribute.cs @@ -1,29 +1,22 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Attributes; -namespace JiShe.CollectBus.Kafka.Attributes +[AttributeUsage(AttributeTargets.Class, Inherited = false)] +public class TopicAttribute : Attribute { - [AttributeUsage(AttributeTargets.Class, Inherited = false)] - public class TopicAttribute: Attribute + /// + /// Initializes a new instance of the class. + /// + /// The name. + public TopicAttribute(string name = "Default") { - /// - /// Initializes a new instance of the class. - /// - /// The name. - public TopicAttribute(string name = "Default") - { - Name = name; - } - - /// - /// Gets or sets the name. - /// - /// - /// The name. - /// - public string Name { get; set; } + Name = name; } -} + + /// + /// Gets or sets the name. + /// + /// + /// The name. + /// + public string Name { get; set; } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index 5093561..5313833 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -13,15 +13,18 @@ namespace JiShe.CollectBus.Kafka.Consumer public class ConsumerService : IConsumerService, IDisposable { private readonly ILogger _logger; - private readonly IConfiguration _configuration; private readonly ConcurrentDictionary _consumerStore = new(); private readonly KafkaOptionConfig _kafkaOptionConfig; private class KafkaConsumer where TKey : notnull where TValue : class { } - public ConsumerService(IConfiguration configuration, ILogger logger, IOptions kafkaOptionConfig) + /// + /// ConsumerService + /// + /// + /// + public ConsumerService(ILogger logger, IOptions kafkaOptionConfig) { - _configuration = configuration; _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; } @@ -165,10 +168,10 @@ namespace JiShe.CollectBus.Kafka.Consumer /// /// 订阅消息 /// - /// /// /// /// + /// /// public async Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TValue : class { @@ -387,7 +390,7 @@ namespace JiShe.CollectBus.Kafka.Consumer /// 消费等待时间 public async Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class { - await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); + await SubscribeBatchAsync(new[] { topic }, messageBatchHandler, groupId, batchSize, batchTimeout, consumeTimeout); } diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs index 77901ef..32ade01 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/IConsumerService.cs @@ -1,46 +1,50 @@ -using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Consumer; -namespace JiShe.CollectBus.Kafka.Consumer +public interface IConsumerService { - public interface IConsumerService - { - Task SubscribeAsync(string topic, Func> messageHandler, string? groupId=null) where TKey : notnull where TValue : class; + Task SubscribeAsync(string topic, Func> messageHandler, + string? groupId = null) where TKey : notnull where TValue : class; - /// - /// 订阅消息 - /// - /// - /// - /// - /// - Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) where TValue : class; + /// + /// 订阅消息 + /// + /// + /// + /// + /// + Task SubscribeAsync(string topic, Func> messageHandler, string? groupId = null) + where TValue : class; - Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) where TKey : notnull where TValue : class; + Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId) + where TKey : notnull where TValue : class; - /// - /// 订阅消息 - /// - /// - /// - /// - /// - /// - Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) where TValue : class; + /// + /// 订阅消息 + /// + /// + /// + /// + /// + /// + Task SubscribeAsync(string[] topics, Func> messageHandler, string? groupId = null) + where TValue : class; - Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, + string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) + where TKey : notnull where TValue : class; - Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) where TKey : notnull where TValue : class; + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, + string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null) + where TKey : notnull where TValue : class; - Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + Task SubscribeBatchAsync(string topic, Func, Task> messageBatchHandler, + string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) + where TValue : class; - Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) where TValue : class; + Task SubscribeBatchAsync(string[] topics, Func, Task> messageBatchHandler, + string? groupId = null, int batchSize = 100, TimeSpan? batchTimeout = null, TimeSpan? consumeTimeout = null) + where TValue : class; - void Unsubscribe() where TKey : notnull where TValue : class; - } -} + void Unsubscribe() where TKey : notnull where TValue : class; +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs index 7b479fa..8bcdcdc 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/HeadersFilter.cs @@ -1,30 +1,22 @@ using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka.Internal +namespace JiShe.CollectBus.Kafka.Internal; + +/// +/// 消息头过滤器 +/// +public class HeadersFilter : Dictionary { /// - /// 消息头过滤器 + /// 判断Headers是否匹配 /// - public class HeadersFilter : Dictionary + /// + /// + public bool Match(Headers headers) { - /// - /// 判断Headers是否匹配 - /// - /// - /// - public bool Match(Headers headers) - { - foreach (var kvp in this) - { - if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value)) - return false; - } - return true; - } + foreach (var kvp in this) + if (!headers.TryGetLastBytes(kvp.Key, out var value) || !value.SequenceEqual(kvp.Value)) + return false; + return true; } -} +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs index 0ad450f..5345f1b 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/IKafkaSubscribe.cs @@ -1,18 +1,11 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Internal; -namespace JiShe.CollectBus.Kafka.Internal +/// +/// Kafka订阅者 +/// +/// 订阅者需要继承此接口并需要依赖注入,并使用标记 +/// +/// +public interface IKafkaSubscribe { - /// - /// Kafka订阅者 - /// - /// 订阅者需要继承此接口并需要依赖注入,并使用标记 - /// - /// - public interface IKafkaSubscribe - { - } -} +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs index ce24d69..27512f7 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/ISubscribeAck.cs @@ -1,21 +1,14 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +namespace JiShe.CollectBus.Kafka.Internal; -namespace JiShe.CollectBus.Kafka.Internal +public interface ISubscribeAck { - public interface ISubscribeAck - { - /// - /// 是否成功标记 - /// - bool Ack { get; set; } + /// + /// 是否成功标记 + /// + bool Ack { get; set; } - /// - /// 消息 - /// - string? Msg { get; set; } - } -} + /// + /// 消息 + /// + string? Msg { get; set; } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index a3fb126..5f7bdf1 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -1,68 +1,61 @@ using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka.Internal +namespace JiShe.CollectBus.Kafka.Internal; + +public class KafkaOptionConfig { - public class KafkaOptionConfig - { - /// - /// kafka地址 - /// - public string BootstrapServers { get; set; } = null!; + /// + /// kafka地址 + /// + public string BootstrapServers { get; set; } = null!; - /// - /// 服务器标识 - /// - public string ServerTagName { get; set; }= "KafkaFilterKey"; + /// + /// 服务器标识 + /// + public string ServerTagName { get; set; } = "KafkaFilterKey"; - /// - /// kafka主题副本数量 - /// - public short KafkaReplicationFactor { get; set; } + /// + /// kafka主题副本数量 + /// + public short KafkaReplicationFactor { get; set; } - /// - /// kafka主题分区数量 - /// - public int NumPartitions { get; set; } + /// + /// kafka主题分区数量 + /// + public int NumPartitions { get; set; } - /// - /// 是否开启过滤器 - /// - public bool EnableFilter { get; set; }= true; + /// + /// 是否开启过滤器 + /// + public bool EnableFilter { get; set; } = true; - /// - /// 是否开启认证 - /// - public bool EnableAuthorization { get; set; } = false; + /// + /// 是否开启认证 + /// + public bool EnableAuthorization { get; set; } = false; - /// - /// 安全协议 - /// - public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.SaslPlaintext; + /// + /// 安全协议 + /// + public SecurityProtocol SecurityProtocol { get; set; } = SecurityProtocol.SaslPlaintext; - /// - /// 认证方式 - /// - public SaslMechanism SaslMechanism { get; set; }= SaslMechanism.Plain; + /// + /// 认证方式 + /// + public SaslMechanism SaslMechanism { get; set; } = SaslMechanism.Plain; - /// - /// 用户名 - /// - public string? SaslUserName { get; set; } + /// + /// 用户名 + /// + public string? SaslUserName { get; set; } - /// - /// 密码 - /// - public string? SaslPassword { get; set; } + /// + /// 密码 + /// + public string? SaslPassword { get; set; } - /// - /// 首次采集时间 - /// - public DateTime FirstCollectionTime { get; set; } - - } -} + /// + /// 首次采集时间 + /// + public DateTime FirstCollectionTime { get; set; } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs index ded11d7..a39db81 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/ReflectionHelper.cs @@ -1,113 +1,103 @@ -using Newtonsoft.Json; -using System; -using System.Collections; -using System.Collections.Generic; -using System.Linq; +using System.Collections; using System.Reflection; -using System.Text; -using System.Threading.Tasks; -namespace JiShe.CollectBus.Kafka.Internal +namespace JiShe.CollectBus.Kafka.Internal; + +/// +/// 反射辅助类 +/// +public static class ReflectionHelper { /// - /// 反射辅助类 + /// 集合类型 + /// Item1:参数类型 + /// Item2:集合元素类型 /// - public static class ReflectionHelper + public static Tuple GetParameterTypeInfo(this MethodInfo method, int parameterIndex = 0) { - /// - ///集合类型 - ///Item1:参数类型 - ///Item2:集合元素类型 - /// - public static Tuple GetParameterTypeInfo(this MethodInfo method, int parameterIndex=0) - { - // 参数校验 - if (method == null) throw new ArgumentNullException(nameof(method)); - var parameters = method.GetParameters(); - if (parameterIndex < 0 || parameterIndex >= parameters.Length) - throw new ArgumentOutOfRangeException(nameof(parameterIndex)); + // 参数校验 + if (method == null) throw new ArgumentNullException(nameof(method)); + var parameters = method.GetParameters(); + if (parameterIndex < 0 || parameterIndex >= parameters.Length) + throw new ArgumentOutOfRangeException(nameof(parameterIndex)); - ParameterInfo param = parameters[parameterIndex]; - Type paramType = param.ParameterType; - Type? elementType = null; + var param = parameters[parameterIndex]; + var paramType = param.ParameterType; + Type? elementType = null; - // 判断是否是集合类型(排除字符串) - if (paramType != typeof(string) && IsEnumerableType(paramType)) - { - elementType = GetEnumerableElementType(paramType); - } + // 判断是否是集合类型(排除字符串) + if (paramType != typeof(string) && IsEnumerableType(paramType)) + elementType = GetEnumerableElementType(paramType); - return Tuple.Create(paramType, elementType); - - } + return Tuple.Create(paramType, elementType); + } - /// - /// 判断是否是集合类型(排除字符串) - /// - public static bool IsEnumerableType(this Type type) - { - return type.IsArray - || (type.IsGenericType && type.GetInterfaces() - .Any(t => t.IsGenericType - && t.GetGenericTypeDefinition() == typeof(IEnumerable<>))) - || type.GetInterfaces().Any(t => t == typeof(System.Collections.IEnumerable)); - } + /// + /// 判断是否是集合类型(排除字符串) + /// + public static bool IsEnumerableType(this Type type) + { + return type.IsArray + || (type.IsGenericType && type.GetInterfaces() + .Any(t => t.IsGenericType + && t.GetGenericTypeDefinition() == typeof(IEnumerable<>))) + || type.GetInterfaces().Any(t => t == typeof(IEnumerable)); + } - /// - /// 获取集合元素的类型 - /// - public static Type? GetEnumerableElementType(this Type type) - { - // 处理数组类型 - if (type.IsArray) - return type.GetElementType(); + /// + /// 获取集合元素的类型 + /// + public static Type? GetEnumerableElementType(this Type type) + { + // 处理数组类型 + if (type.IsArray) + return type.GetElementType(); - // 处理直接实现IEnumerable的类型(如IEnumerable本身) - if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IEnumerable<>)) - return type.GetGenericArguments()[0]; + // 处理直接实现IEnumerable的类型(如IEnumerable本身) + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IEnumerable<>)) + return type.GetGenericArguments()[0]; - // 处理通过接口实现IEnumerable的泛型集合(如List) - var genericEnumerable = type.GetInterfaces() - .FirstOrDefault(t => t.IsGenericType - && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)); - if (genericEnumerable != null) - return genericEnumerable.GetGenericArguments()[0]; + // 处理通过接口实现IEnumerable的泛型集合(如List) + var genericEnumerable = type.GetInterfaces() + .FirstOrDefault(t => t.IsGenericType + && t.GetGenericTypeDefinition() == typeof(IEnumerable<>)); + if (genericEnumerable != null) + return genericEnumerable.GetGenericArguments()[0]; - // 处理非泛型集合类型(如 ArrayList) - if (typeof(IEnumerable).IsAssignableFrom(type) && type == typeof(ArrayList)) - return typeof(ArrayList); - // 返回null表示无法确定元素类型 - return null; - } + // 处理非泛型集合类型(如 ArrayList) + if (typeof(IEnumerable).IsAssignableFrom(type) && type == typeof(ArrayList)) + return typeof(ArrayList); + // 返回null表示无法确定元素类型 + return null; + } - // - /// 判断是否使用强转换 - /// - /// 目标类型 - /// - public static bool IsConvertType(this Type targetType) - { - // 处理可空类型 - Type underlyingType = Nullable.GetUnderlyingType(targetType) ?? targetType; - // 情况1:值类型或基元类型(如 int、DateTime) - if (underlyingType.IsValueType || underlyingType.IsPrimitive) - return true; - // 情况2:字符串类型直接赋值 - else if (underlyingType == typeof(string)) - return true; - - // 情况3:枚举类型处理 - //else if (underlyingType.IsEnum) - //{ - // if (Enum.IsDefined(underlyingType, msg)) - // { - // convertedValue = Enum.Parse(underlyingType, msg.ToString()); - // return true; - // } - // return false; - //} - return false; - } + /// + /// 判断是否使用强转换 + /// + /// + /// + public static bool IsConvertType(this Type targetType) + { + // 处理可空类型 + var underlyingType = Nullable.GetUnderlyingType(targetType) ?? targetType; + // 情况1:值类型或基元类型(如 int、DateTime) + if (underlyingType.IsValueType || underlyingType.IsPrimitive) + return true; + // 情况2:字符串类型直接赋值 + if (underlyingType == typeof(string)) + return true; + + // 情况3:枚举类型处理 + //else if (underlyingType.IsEnum) + //{ + // if (Enum.IsDefined(underlyingType, msg)) + // { + // convertedValue = Enum.Parse(underlyingType, msg.ToString()); + // return true; + // } + // return false; + //} + return false; } } \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs index 0c016ff..d3fbcf6 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/SubscribeResult.cs @@ -1,75 +1,62 @@ -using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using static System.Runtime.InteropServices.JavaScript.JSType; +namespace JiShe.CollectBus.Kafka.Internal; -namespace JiShe.CollectBus.Kafka.Internal +public class SubscribeResult : ISubscribeAck { - public class SubscribeResult: ISubscribeAck + /// + /// 是否成功 + /// + public bool Ack { get; set; } + + /// + /// 消息 + /// + public string? Msg { get; set; } + + + /// + /// 成功 + /// + /// 消息 + public SubscribeResult Success(string? msg = null) { - /// - /// 是否成功 - /// - public bool Ack { get; set; } - - /// - /// 消息 - /// - public string? Msg { get; set; } - - - /// - /// 成功 - /// - /// 消息 - public SubscribeResult Success(string? msg = null) - { - Ack = true; - Msg = msg; - return this; - } - - /// - /// 失败 - /// - /// - /// - /// - /// - public SubscribeResult Fail(string? msg = null) - { - Msg = msg; - Ack = false; - return this; - } + Ack = true; + Msg = msg; + return this; } - public static partial class SubscribeAck + /// + /// 失败 + /// + /// + /// + public SubscribeResult Fail(string? msg = null) { - - /// - /// 成功 - /// - /// 消息 - /// - public static ISubscribeAck Success(string? msg = null) - { - return new SubscribeResult().Success(msg); - } - - - /// - /// 失败 - /// - /// 消息 - /// - public static ISubscribeAck Fail(string? msg = null) - { - return new SubscribeResult().Fail(msg); - } + Msg = msg; + Ack = false; + return this; } - } + +public static class SubscribeAck +{ + /// + /// 成功 + /// + /// 消息 + /// + public static ISubscribeAck Success(string? msg = null) + { + return new SubscribeResult().Success(msg); + } + + + /// + /// 失败 + /// + /// 消息 + /// + public static ISubscribeAck Fail(string? msg = null) + { + return new SubscribeResult().Fail(msg); + } +} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs similarity index 93% rename from modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs rename to modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs index a5bf39c..64f8dec 100644 --- a/modules/JiShe.CollectBus.Kafka/KafkaSubcribesExtensions.cs +++ b/modules/JiShe.CollectBus.Kafka/KafkaSubscribeExtensions.cs @@ -8,26 +8,17 @@ using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Mvc.Abstractions; -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using System; using System.Collections; -using System.Collections.Generic; using System.ComponentModel; -using System.Linq.Expressions; using System.Reflection; -using System.Text.Json; -using System.Threading.Tasks; -using YamlDotNet.Core.Tokens; -using static System.Runtime.InteropServices.JavaScript.JSType; namespace JiShe.CollectBus.Kafka { - public static class KafkaSubcribesExtensions + public static class KafkaSubscribeExtensions { public static void UseInitKafkaTopic(this IServiceProvider provider) @@ -36,7 +27,7 @@ namespace JiShe.CollectBus.Kafka var kafkaAdminClient = provider.GetRequiredService(); var kafkaOptions = provider.GetRequiredService>(); - List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); foreach (var item in topics) @@ -48,8 +39,6 @@ namespace JiShe.CollectBus.Kafka /// /// 添加Kafka订阅 /// - /// - /// public static void UseKafkaSubscribe(this IServiceProvider provider) { var lifetime = provider.GetRequiredService(); @@ -57,8 +46,8 @@ namespace JiShe.CollectBus.Kafka lifetime.ApplicationStarted.Register(() => { var logger = provider.GetRequiredService>(); - int threadCount = 0; - int topicCount = 0; + var threadCount = 0; + var topicCount = 0; var assemblyPath = Path.GetDirectoryName(Assembly.GetEntryAssembly()?.Location); if (string.IsNullOrWhiteSpace(assemblyPath)) { @@ -98,6 +87,9 @@ namespace JiShe.CollectBus.Kafka }); } + /// + /// 添加Kafka订阅 + /// public static void UseKafkaSubscribersAsync(this IApplicationBuilder app, Assembly assembly) { var provider = app.ApplicationServices; @@ -134,8 +126,6 @@ namespace JiShe.CollectBus.Kafka /// /// 构建Kafka订阅 /// - /// - /// private static Tuple BuildKafkaSubscribe(object subscribe, IServiceProvider provider, ILogger logger, KafkaOptionConfig kafkaOptionConfig) { var subscribedMethods = subscribe.GetType().GetMethods() @@ -169,11 +159,6 @@ namespace JiShe.CollectBus.Kafka /// /// 启动后台消费线程 /// - /// - /// - /// - /// - /// private static async Task StartConsumerAsync(IServiceProvider provider, KafkaSubscribeAttribute attr, MethodInfo method, object subscribe, ILogger logger) { var consumerService = provider.GetRequiredService(); @@ -225,10 +210,6 @@ namespace JiShe.CollectBus.Kafka /// /// 处理消息 /// - /// - /// - /// - /// private static async Task ProcessMessageAsync(List messages, MethodInfo method, object subscribe) { var parameters = method.GetParameters(); @@ -351,9 +332,6 @@ namespace JiShe.CollectBus.Kafka } return false; } - - - } diff --git a/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs index becea90..8cbe665 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/IProducerService.cs @@ -1,9 +1,4 @@ using Confluent.Kafka; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace JiShe.CollectBus.Kafka.Producer { diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index db0efd8..16499b5 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -23,6 +23,13 @@ namespace JiShe.CollectBus.Kafka.Producer private readonly ConcurrentDictionary _producerCache = new(); private class KafkaProducer where TKey : notnull where TValue : class { } private readonly KafkaOptionConfig _kafkaOptionConfig; + + /// + /// ProducerService + /// + /// + /// + /// public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig) { _configuration = configuration; diff --git a/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs b/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs index 6211273..12f46d1 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs @@ -4,14 +4,12 @@ using Volo.Abp.EventBus; namespace JiShe.CollectBus.Samples; [EventName("Sample.Kafka.Test")] -[TopicName("Test1")] public class SampleDto { public int Value { get; set; } } [EventName("Sample.Kafka.Test2")] -[TopicName("Test2")] public class SampleDto2 { public int Value { get; set; } diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 37632e7..988c9df 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -11,9 +11,11 @@ using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading.Tasks; +using Cassandra.Mapping; using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.IoTDB; +using JiShe.CollectBus.Mappers; using Volo.Abp; using Volo.Abp.Application; using Volo.Abp.Autofac; @@ -24,6 +26,8 @@ using Volo.Abp.EventBus; using Volo.Abp.Modularity; using Microsoft.Extensions.Options; using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Interceptors; +using JiShe.CollectBus.Common.Attributes; namespace JiShe.CollectBus; @@ -37,7 +41,7 @@ namespace JiShe.CollectBus; typeof(CollectBusFreeRedisModule), typeof(CollectBusFreeSqlModule), typeof(CollectBusKafkaModule), - typeof(CollectBusIoTDBModule), + typeof(CollectBusIoTDbModule), typeof(CollectBusCassandraModule) )] public class CollectBusApplicationModule : AbpModule @@ -50,7 +54,24 @@ public class CollectBusApplicationModule : AbpModule Configure(options => { options.AddMaps(validate: true); - }); + }); + + context.Services.AddSingleton(new MappingConfiguration() + .Define(new CollectBusMapping())); + + // 注册拦截器 + context.Services.OnRegistered(ctx => + { + var methods = ctx.ImplementationType.GetMethods(); + foreach (var method in methods) + { + var attr = method.GetCustomAttribute(typeof(LogInterceptAttribute), true); + if (attr != null) + { + ctx.Interceptors.TryAdd(); + } + } + }); } public override async Task OnApplicationInitializationAsync( @@ -70,10 +91,9 @@ public class CollectBusApplicationModule : AbpModule //初始化主题信息 var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); - var configuration = context.ServiceProvider.GetRequiredService(); var kafkaOptions = context.ServiceProvider.GetRequiredService>(); - List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); foreach (var item in topics) diff --git a/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs new file mode 100644 index 0000000..a3969aa --- /dev/null +++ b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs @@ -0,0 +1,9 @@ +using System; + +namespace JiShe.CollectBus.Interceptors +{ + [AttributeUsage(AttributeTargets.Method)] + public class LogInterceptAttribute : Attribute + { + } +} diff --git a/services/JiShe.CollectBus.Application/Interceptors/LogInterceptor.cs b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptor.cs new file mode 100644 index 0000000..a796aaf --- /dev/null +++ b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptor.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; +using Volo.Abp.DynamicProxy; + +namespace JiShe.CollectBus.Interceptors +{ + public class LogInterceptor : AbpInterceptor, ITransientDependency + { + public override async Task InterceptAsync(IAbpMethodInvocation invocation) + { + // 方法执行前的逻辑(如果需要) + + try + { + // 执行原始方法 + await invocation.ProceedAsync(); + + // 方法执行成功后,返回前的逻辑 + await OnSuccessAsync(invocation); + } + catch (Exception ex) + { + // 出现异常时的逻辑 + await OnExceptionAsync(invocation, ex); + throw; + } + finally + { + // 方法结束前一定会执行的逻辑 + await OnCompleteAsync(invocation); + } + } + + private Task OnSuccessAsync(IAbpMethodInvocation invocation) + { + // 方法执行成功后的逻辑 + // 可以访问 invocation.ReturnValue 获取返回值 + Console.WriteLine($"方法 {invocation.Method.Name} 成功执行,返回值:{invocation.ReturnValue}"); + return Task.CompletedTask; + } + + private Task OnExceptionAsync(IAbpMethodInvocation invocation, Exception ex) + { + // 方法执行异常时的逻辑 + Console.WriteLine($"方法 {invocation.Method.Name} 执行异常:{ex.Message}"); + return Task.CompletedTask; + } + + private Task OnCompleteAsync(IAbpMethodInvocation invocation) + { + // 无论成功失败,方法执行完毕后的逻辑 + Console.WriteLine($"方法 {invocation.Method.Name} 执行完毕"); + return Task.CompletedTask; + } + } +} diff --git a/modules/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs b/services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs similarity index 61% rename from modules/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs rename to services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs index d07f8ea..a4d4ef4 100644 --- a/modules/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs +++ b/services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs @@ -1,13 +1,8 @@ using Cassandra.Mapping; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; -using static Cassandra.QueryTrace; -namespace JiShe.CollectBus.Cassandra.Mappers +namespace JiShe.CollectBus.Mappers { public class CollectBusMapping: Mappings { @@ -15,6 +10,8 @@ namespace JiShe.CollectBus.Cassandra.Mappers { For() .Column(e => e.Type, cm => cm.WithName("type").WithDbType()); + For() + .Column(e => e.Status, cm => cm.WithName("status").WithDbType()); } } } diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 2988b96..d729fe1 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -34,13 +34,13 @@ namespace JiShe.CollectBus.Samples; public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaSubscribe { private readonly ILogger _logger; - private readonly IIoTDBProvider _iotDBProvider; - private readonly IoTDBRuntimeContext _dbContext; - private readonly IoTDBOptions _options; + private readonly IIoTDbProvider _iotDBProvider; + private readonly IoTDbRuntimeContext _dbContext; + private readonly IoTDbOptions _options; private readonly IRedisDataCacheService _redisDataCacheService; - public SampleAppService(IIoTDBProvider iotDBProvider, IOptions options, - IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) + public SampleAppService(IIoTDbProvider iotDBProvider, IOptions options, + IoTDbRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) { _iotDBProvider = iotDBProvider; _options = options.Value; diff --git a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs index 2de458f..68d367c 100644 --- a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs @@ -25,6 +25,7 @@ using Volo.Abp.Domain.Repositories; using System.Diagnostics; using System.Linq; using Cassandra; +using JiShe.CollectBus.Interceptors; namespace JiShe.CollectBus.Samples; @@ -35,11 +36,10 @@ public class TestAppService : CollectBusAppService private readonly ICassandraRepository _messageReceivedCassandraRepository; private readonly ICassandraProvider _cassandraProvider; - - public TestAppService( ILogger logger, - ICassandraRepository messageReceivedCassandraRepository, ICassandraProvider cassandraProvider) + ICassandraRepository messageReceivedCassandraRepository, + ICassandraProvider cassandraProvider) { _logger = logger; _messageReceivedCassandraRepository = messageReceivedCassandraRepository; @@ -122,4 +122,11 @@ public class TestAppService : CollectBusAppService // 等待所有批处理完成 await Task.WhenAll(tasks); } + + [LogIntercept] + public async Task LogInterceptorTest(string str) + { + _logger.LogWarning(str); + return str; + } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index a7f9f3c..4ff5dd6 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -37,7 +37,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService { private readonly ILogger _logger; - private readonly IIoTDBProvider _dbProvider; + private readonly IIoTDbProvider _dbProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IProducerService _producerService; private readonly IRedisDataCacheService _redisDataCacheService; @@ -48,7 +48,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IRedisDataCacheService redisDataCacheService, - IIoTDBProvider dbProvider, + IIoTDbProvider dbProvider, IOptions kafkaOptions) { _logger = logger; diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index fe398a5..b8fd08b 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -37,7 +37,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading string serverTagName = string.Empty; public EnergySystemScheduledMeterReadingService( ILogger logger, - IIoTDBProvider dbProvider, + IIoTDbProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository, IOptions kafkaOptions, IProducerService producerService, diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 889cd91..8db1072 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -30,10 +30,8 @@ namespace JiShe.CollectBus.Subscribers private readonly IServiceProvider _serviceProvider; private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; - private readonly IRepository _messageReceivedEventRepository; - private readonly IRepository _deviceRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; - private readonly IIoTDBProvider _dbProvider; + private readonly IIoTDbProvider _dbProvider; /// /// Initializes a new instance of the class. @@ -43,16 +41,13 @@ namespace JiShe.CollectBus.Subscribers /// The service provider. /// The message received login event repository. /// The message received heartbeat event repository. - /// The message received event repository. - /// The device repository. /// The device repository. public SubscriberAppService(ILogger logger, - ITcpService tcpService, IServiceProvider serviceProvider, + ITcpService tcpService, + IServiceProvider serviceProvider, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository, - IRepository messageReceivedEventRepository, - IRepository deviceRepository, - IIoTDBProvider dbProvider, + IIoTDbProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordsRepository) { _logger = logger; @@ -60,8 +55,6 @@ namespace JiShe.CollectBus.Subscribers _serviceProvider = serviceProvider; _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; - _messageReceivedEventRepository = messageReceivedEventRepository; - _deviceRepository = deviceRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository; _dbProvider = dbProvider; } diff --git a/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs b/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs index 12f3e37..925692e 100644 --- a/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs @@ -10,7 +10,6 @@ using Volo.Abp.Uow; namespace JiShe.CollectBus.Workers { - [IgnoreJob] public class EpiCollectWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker { private readonly ILogger _logger; diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs b/services/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs index 91a5ce7..8861c70 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs @@ -1,14 +1,18 @@ using System; using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; using System.Linq; using System.Text; using System.Threading.Tasks; +using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Enums; +using Volo.Abp.Auditing; using Volo.Abp.Domain.Entities; +using Volo.Abp.Logging; namespace JiShe.CollectBus.IotSystems.Devices { - public class Device : AggregateRoot + public class Device : BasicAggregateRoot { /// /// Device @@ -20,6 +24,7 @@ namespace JiShe.CollectBus.IotSystems.Devices /// public Device(string number, string clientId, DateTime firstOnlineTime, DateTime lastOnlineTime, DeviceStatus status) { + Id = Guid.NewGuid(); Number = number; FirstOnlineTime = firstOnlineTime; LastOnlineTime = lastOnlineTime; @@ -30,6 +35,7 @@ namespace JiShe.CollectBus.IotSystems.Devices /// /// 集中器编号,在集中器登录时解析获取,并会更新为当前TCP连接的最新ClientId /// + [PartitionKey] public string Number { get; set; } /// @@ -55,6 +61,7 @@ namespace JiShe.CollectBus.IotSystems.Devices /// /// 设备状态 /// + [PartitionKey] public DeviceStatus Status { get; set; } /// diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs b/services/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs index 072abcb..09294c1 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs @@ -11,19 +11,14 @@ using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IotSystems.MessageIssueds { [CassandraTable] - public class MessageIssued:IEntity + public class MessageIssued: ICassandraEntity { public string ClientId { get; set; } public byte[] Message { get; set; } public string DeviceNo { get; set; } public IssuedEventType Type { get; set; } public string MessageId { get; set; } - [Key] + [PartitionKey] public string Id { get; set; } - - public object?[] GetKeys() - { - return new object[] { Id }; - } } } diff --git a/shared/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs b/shared/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs index 5ee6d8f..72db97c 100644 --- a/shared/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs +++ b/shared/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs @@ -21,4 +21,20 @@ namespace JiShe.CollectBus.Common.Attributes { } } + + [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)] + public class PartitionKeyAttribute : Attribute + { + public PartitionKeyAttribute() + { + } + } + + [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)] + public class ClusteringKeyAttribute : Attribute + { + public ClusteringKeyAttribute() + { + } + } } diff --git a/shared/JiShe.CollectBus.Common/Attributes/IgnoreJobAttribute.cs b/shared/JiShe.CollectBus.Common/Attributes/IgnoreJobAttribute.cs deleted file mode 100644 index 464d1ee..0000000 --- a/shared/JiShe.CollectBus.Common/Attributes/IgnoreJobAttribute.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Common.Attributes -{ - [AttributeUsage(AttributeTargets.Class, Inherited = false)] - public class IgnoreJobAttribute : Attribute - { - } -} diff --git a/shared/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs b/shared/JiShe.CollectBus.Common/Attributes/NumericalOrderAttribute.cs similarity index 92% rename from shared/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs rename to shared/JiShe.CollectBus.Common/Attributes/NumericalOrderAttribute.cs index a062f16..b337ae1 100644 --- a/shared/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs +++ b/shared/JiShe.CollectBus.Common/Attributes/NumericalOrderAttribute.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Common.AttributeInfo +namespace JiShe.CollectBus.Common.Attributes { /// /// 排序序号 diff --git a/shared/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs b/shared/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs deleted file mode 100644 index 7e404a5..0000000 --- a/shared/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Volo.Abp.EventBus; -using Volo.Abp; - -namespace JiShe.CollectBus.Common.Attributes -{ - [AttributeUsage(AttributeTargets.Class, Inherited = false)] - public class TopicNameAttribute : Attribute - { - public virtual string Name { get; } - - public TopicNameAttribute(string name) - { - this.Name = Check.NotNullOrWhiteSpace(name, nameof(name)); - } - - public string GetName(Type eventType) => this.Name; - } -} diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 6871a9e..3c36d23 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -8,7 +8,7 @@ using System.Runtime.InteropServices; using System.Security.Cryptography; using System.Text; using System.Threading.Tasks; -using JiShe.CollectBus.Common.AttributeInfo; +using JiShe.CollectBus.Common.Attributes; namespace JiShe.CollectBus.Common.Helpers { diff --git a/shared/JiShe.CollectBus.Domain.Shared/CassandraBaseEntity.cs b/shared/JiShe.CollectBus.Domain.Shared/CassandraBaseEntity.cs new file mode 100644 index 0000000..0e7fc58 --- /dev/null +++ b/shared/JiShe.CollectBus.Domain.Shared/CassandraBaseEntity.cs @@ -0,0 +1,14 @@ +using System; +using System.ComponentModel.DataAnnotations; +using JiShe.CollectBus.Common.Attributes; + +namespace JiShe.CollectBus +{ + public class CassandraBaseEntity: ICassandraEntity + { + /// + /// Id + /// + public TKey Id { get; set; } + } +} diff --git a/shared/JiShe.CollectBus.Domain.Shared/ICassandraEntity.cs b/shared/JiShe.CollectBus.Domain.Shared/ICassandraEntity.cs new file mode 100644 index 0000000..760dd7a --- /dev/null +++ b/shared/JiShe.CollectBus.Domain.Shared/ICassandraEntity.cs @@ -0,0 +1,19 @@ +using System; + +namespace JiShe.CollectBus +{ + public interface ICassandraEntity + { + TKey Id { get; set; } + } + + public interface IHasCreationTime + { + DateTime CreationTime { get; set; } + } + + public interface IHasLastModificationTime + { + DateTime? LastModificationTime { get; set; } + } +} diff --git a/shared/JiShe.CollectBus.Domain.Shared/JiShe.CollectBus.Domain.Shared.csproj b/shared/JiShe.CollectBus.Domain.Shared/JiShe.CollectBus.Domain.Shared.csproj index a2747f4..874b688 100644 --- a/shared/JiShe.CollectBus.Domain.Shared/JiShe.CollectBus.Domain.Shared.csproj +++ b/shared/JiShe.CollectBus.Domain.Shared/JiShe.CollectBus.Domain.Shared.csproj @@ -31,4 +31,8 @@ + + + + diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index 9d1f7cb..e2ba871 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -84,7 +84,7 @@ "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, "NumPartitions": 30, - "ServerTagName": "JiSheCollectBus2" + "ServerTagName": "JiSheCollectBus20" }, "IoTDBOptions": { "UserName": "root", @@ -94,8 +94,7 @@ "DataBaseName": "energy", "OpenDebugMode": true, "UseTableSessionPoolByDefault": false - }, - "ServerTagName": "JiSheCollectBus3", + }, "Cassandra": { "ReplicationStrategy": { "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下