From 640fcb754dda71cfc371df7efe11ab7b6e1888e6 Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Mon, 21 Apr 2025 09:54:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=8B=A6=E6=88=AA=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CassandraQueryOptimizer.cs | 156 ------------------ .../CassandraRepository.cs | 33 ++-- .../CollectBusCassandraModule.cs | 5 +- .../Extensions/ServiceCollectionExtensions.cs | 10 +- .../Extensions/SessionExtension.cs | 22 ++- .../ICassandraRepository.cs | 5 +- .../JiShe.CollectBus.Cassandra.csproj | 2 +- .../Samples/SampleDto.cs | 2 - .../CollectBusApplicationModule.cs | 26 ++- .../Interceptors/LogInterceptAttribute.cs | 9 + .../Interceptors/LogInterceptor.cs | 60 +++++++ .../Mappers/CollectBusMapping.cs | 11 +- .../Samples/TestAppService.cs | 13 +- .../Subscribers/SubscriberAppService.cs | 11 +- .../Workers/EpiCollectWorker.cs | 1 - .../IotSystems/Devices/Device.cs | 9 +- .../MessageIssueds/MessageIssued.cs | 9 +- .../Attributes/CassandraTableAttribute.cs | 16 ++ .../Attributes/IgnoreJobAttribute.cs | 13 -- .../NumericalOrderAttribute.cs | 2 +- .../Attributes/TopicNameAttribute.cs | 23 --- .../Helpers/CommonHelper.cs | 2 +- .../CassandraBaseEntity.cs | 14 ++ .../ICassandraEntity.cs | 19 +++ .../JiShe.CollectBus.Domain.Shared.csproj | 4 + web/JiShe.CollectBus.Host/appsettings.json | 5 +- 26 files changed, 220 insertions(+), 262 deletions(-) delete mode 100644 modules/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs create mode 100644 services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs create mode 100644 services/JiShe.CollectBus.Application/Interceptors/LogInterceptor.cs rename {modules/JiShe.CollectBus.Cassandra => services/JiShe.CollectBus.Application}/Mappers/CollectBusMapping.cs (61%) delete mode 100644 shared/JiShe.CollectBus.Common/Attributes/IgnoreJobAttribute.cs rename shared/JiShe.CollectBus.Common/{AttributeInfo => Attributes}/NumericalOrderAttribute.cs (92%) delete mode 100644 shared/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs create mode 100644 shared/JiShe.CollectBus.Domain.Shared/CassandraBaseEntity.cs create mode 100644 shared/JiShe.CollectBus.Domain.Shared/ICassandraEntity.cs diff --git a/modules/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs b/modules/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs deleted file mode 100644 index 0ea1b56..0000000 --- a/modules/JiShe.CollectBus.Cassandra/CassandraQueryOptimizer.cs +++ /dev/null @@ -1,156 +0,0 @@ -using System.Collections.Concurrent; -using Cassandra; -using Cassandra.Mapping; -using Microsoft.Extensions.Caching.Memory; -using Microsoft.Extensions.Logging; - -namespace JiShe.CollectBus.Cassandra -{ - public class CassandraQueryOptimizer - { - private readonly ISession _session; - private readonly ILogger _logger; - private readonly IMemoryCache _cache; - private readonly ConcurrentDictionary _preparedStatements; - private readonly int _batchSize; - private readonly TimeSpan _cacheExpiration; - - public CassandraQueryOptimizer( - ISession session, - ILogger logger, - IMemoryCache cache, - int batchSize = 100, - TimeSpan? cacheExpiration = null) - { - _session = session; - _logger = logger; - _cache = cache; - _preparedStatements = new ConcurrentDictionary(); - _batchSize = batchSize; - _cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5); - } - - public async Task GetOrPrepareStatementAsync(string cql) - { - return _preparedStatements.GetOrAdd(cql, key => - { - try - { - var statement = _session.Prepare(key); - _logger.LogDebug($"Prepared statement for CQL: {key}"); - return statement; - } - catch (Exception ex) - { - _logger.LogError(ex, $"Failed to prepare statement for CQL: {key}"); - throw; - } - }); - } - - public async Task ExecuteBatchAsync(IEnumerable statements) - { - var batch = new BatchStatement(); - var count = 0; - - foreach (var statement in statements) - { - batch.Add(statement); - count++; - - if (count >= _batchSize) - { - await ExecuteBatchAsync(batch); - batch = new BatchStatement(); - count = 0; - } - } - - if (count > 0) - { - await ExecuteBatchAsync(batch); - } - } - - private async Task ExecuteBatchAsync(BatchStatement batch) - { - try - { - await _session.ExecuteAsync(batch); - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to execute batch statement"); - throw; - } - } - - public async Task GetOrSetFromCacheAsync(string cacheKey, Func> getData) - { - if (_cache.TryGetValue(cacheKey, out T cachedValue)) - { - _logger.LogDebug($"Cache hit for key: {cacheKey}"); - return cachedValue; - } - - var data = await getData(); - _cache.Set(cacheKey, data, _cacheExpiration); - _logger.LogDebug($"Cache miss for key: {cacheKey}, data cached"); - return data; - } - - public async Task> ExecutePagedQueryAsync( - string cql, - object[] parameters, - int pageSize = 100, - string pagingState = null) where T : class - { - var statement = await GetOrPrepareStatementAsync(cql); - var boundStatement = statement.Bind(parameters); - - if (!string.IsNullOrEmpty(pagingState)) - { - boundStatement.SetPagingState(Convert.FromBase64String(pagingState)); - } - - boundStatement.SetPageSize(pageSize); - - try - { - var result = await _session.ExecuteAsync(boundStatement); - //TODO: RETURN OBJECT - throw new NotImplementedException(); - //result.GetRows() - //return result.Select(row => row); - } - catch (Exception ex) - { - _logger.LogError(ex, $"Failed to execute paged query: {cql}"); - throw; - } - } - - public async Task BulkInsertAsync(IEnumerable items, string tableName) - { - var mapper = new Mapper(_session); - var batch = new List(); - var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})"; - - foreach (var chunk in items.Chunk(_batchSize)) - { - var statements = chunk.Select(item => - { - var props = typeof(T).GetProperties(); - var columns = string.Join(", ", props.Select(p => p.Name)); - var values = string.Join(", ", props.Select(p => "?")); - var statement = _session.Prepare(string.Format(cql, columns, values)); - return statement.Bind(props.Select(p => p.GetValue(item)).ToArray()); - }); - - batch.AddRange(statements); - } - - await ExecuteBatchAsync(batch); - } - } -} \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Cassandra/CassandraRepository.cs b/modules/JiShe.CollectBus.Cassandra/CassandraRepository.cs index 25a51e3..1d6cc3c 100644 --- a/modules/JiShe.CollectBus.Cassandra/CassandraRepository.cs +++ b/modules/JiShe.CollectBus.Cassandra/CassandraRepository.cs @@ -1,19 +1,13 @@ -using Cassandra; -using Cassandra.Data.Linq; +using System.Linq.Expressions; using Cassandra.Mapping; using JiShe.CollectBus.Cassandra.Extensions; -using JiShe.CollectBus.Common.Attributes; -using Microsoft.AspNetCore.Http; -using System.Reflection; -using Thrift.Protocol.Entities; using Volo.Abp.Domain.Entities; -using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Cassandra { public class CassandraRepository : ICassandraRepository - where TEntity : class + where TEntity : class, ICassandraEntity { private readonly ICassandraProvider _cassandraProvider; public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig) @@ -27,12 +21,29 @@ namespace JiShe.CollectBus.Cassandra public virtual async Task GetAsync(TKey id) { - return await Mapper.SingleOrDefaultAsync("WHERE id = ?", id); + return await GetAsync("WHERE id = ?", id); } - public virtual async Task> GetListAsync() + public virtual async Task GetAsync(string cql, params object[] args) { - return (await Mapper.FetchAsync()).ToList(); + return await Mapper.SingleAsync(cql, args); + } + + + public virtual async Task FirstOrDefaultAsync(TKey id) + { + return await FirstOrDefaultAsync("WHERE id = ?", id); + } + + public virtual async Task FirstOrDefaultAsync(string cql, params object[] args) + { + return await Mapper.FirstOrDefaultAsync(cql, args); + } + + + public virtual async Task?> GetListAsync(string? cql = null, params object[] args) + { + return cql.IsNullOrWhiteSpace() ? (await Mapper.FetchAsync()).ToList() : (await Mapper.FetchAsync(cql, args)).ToList(); } public virtual async Task InsertAsync(TEntity entity) diff --git a/modules/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs b/modules/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs index b5274f7..bf57bce 100644 --- a/modules/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs +++ b/modules/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs @@ -1,7 +1,4 @@ -using Cassandra; -using Cassandra.Mapping; -using JiShe.CollectBus.Cassandra.Mappers; -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Volo.Abp; using Volo.Abp.Autofac; using Volo.Abp.Modularity; diff --git a/modules/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs b/modules/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs index fe69268..c9ec0de 100644 --- a/modules/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs +++ b/modules/JiShe.CollectBus.Cassandra/Extensions/ServiceCollectionExtensions.cs @@ -1,10 +1,4 @@ -using Autofac.Core; -using Cassandra; -using Cassandra.Mapping; -using JiShe.CollectBus.Cassandra; -using JiShe.CollectBus.Cassandra.Mappers; -using Microsoft.Extensions.Options; -using System.Reflection; +using JiShe.CollectBus.Cassandra; using Volo.Abp; using Volo.Abp.Modularity; @@ -26,8 +20,6 @@ namespace Microsoft.Extensions.DependencyInjection public static void AddCassandra(this ServiceConfigurationContext context) { context.Services.AddTransient(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>)); - context.Services.AddSingleton(new MappingConfiguration() - .Define(new CollectBusMapping())); } } } diff --git a/modules/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs b/modules/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs index c313d0c..dd0ff66 100644 --- a/modules/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs +++ b/modules/JiShe.CollectBus.Cassandra/Extensions/SessionExtension.cs @@ -3,9 +3,7 @@ using System.Text; using Cassandra; using System.ComponentModel.DataAnnotations; using JiShe.CollectBus.Common.Attributes; -using Cassandra.Mapping; -using Cassandra.Data.Linq; -using Thrift.Protocol.Entities; +using Volo.Abp.Data; namespace JiShe.CollectBus.Cassandra.Extensions { @@ -16,17 +14,26 @@ namespace JiShe.CollectBus.Cassandra.Extensions var type = typeof(TEntity); var tableAttribute = type.GetCustomAttribute(); var tableName = tableAttribute?.Name ?? type.Name.ToLower(); - //var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace; var tableKeyspace = session.Keyspace; var properties = type.GetProperties(); - var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute() != null); + + // 分区键设置 + var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute() != null); if (primaryKey == null) { throw new InvalidOperationException($"No primary key defined for type {type.Name}"); } + // 集群键设置 + var clusteringKeys = properties.Where(p => p.GetCustomAttribute() != null).Select(a=>a.Name).ToList(); + var clusteringKeyCql = string.Empty; + if (clusteringKeys.Any()) + { + clusteringKeyCql = $", {string.Join(", ", clusteringKeys)}"; + } + var cql = new StringBuilder(); cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} ("); @@ -40,7 +47,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions cql.Append($"{columnName} {cqlType}, "); } cql.Length -= 2; // Remove last comma and space - cql.Append($", PRIMARY KEY ({primaryKey.Name.ToLower()}))"); + cql.Append($", PRIMARY KEY (({primaryKey.Name.ToLower()}){clusteringKeyCql}))"); session.Execute(cql.ToString()); } @@ -61,6 +68,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions if (type == typeof(Guid)) return "uuid"; if (type == typeof(DateTimeOffset)) return "timestamp"; if (type == typeof(Byte[])) return "blob"; + if (type == typeof(ExtraPropertyDictionary)) return "map"; // 处理集合类型 if (type.IsGenericType) @@ -72,6 +80,8 @@ namespace JiShe.CollectBus.Cassandra.Extensions return $"list<{GetCassandraType(elementType)}>"; if (genericType == typeof(HashSet<>)) return $"set<{GetCassandraType(elementType)}>"; + if (genericType == typeof(Nullable<>)) + return GetCassandraType(elementType); if (genericType == typeof(Dictionary<,>)) { var keyType = type.GetGenericArguments()[0]; diff --git a/modules/JiShe.CollectBus.Cassandra/ICassandraRepository.cs b/modules/JiShe.CollectBus.Cassandra/ICassandraRepository.cs index 6dd474f..5f3b862 100644 --- a/modules/JiShe.CollectBus.Cassandra/ICassandraRepository.cs +++ b/modules/JiShe.CollectBus.Cassandra/ICassandraRepository.cs @@ -10,7 +10,10 @@ namespace JiShe.CollectBus.Cassandra public interface ICassandraRepository where TEntity : class { Task GetAsync(TKey id); - Task> GetListAsync(); + Task GetAsync(string cql, params object[] args); + Task FirstOrDefaultAsync(TKey id); + Task FirstOrDefaultAsync(string cql, params object[] args); + Task?> GetListAsync(string? cql = null, params object[] args); Task InsertAsync(TEntity entity); Task UpdateAsync(TEntity entity); Task DeleteAsync(TEntity entity); diff --git a/modules/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj b/modules/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj index edc303c..a3360e3 100644 --- a/modules/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj +++ b/modules/JiShe.CollectBus.Cassandra/JiShe.CollectBus.Cassandra.csproj @@ -15,8 +15,8 @@ - + diff --git a/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs b/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs index 6211273..12f46d1 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs @@ -4,14 +4,12 @@ using Volo.Abp.EventBus; namespace JiShe.CollectBus.Samples; [EventName("Sample.Kafka.Test")] -[TopicName("Test1")] public class SampleDto { public int Value { get; set; } } [EventName("Sample.Kafka.Test2")] -[TopicName("Test2")] public class SampleDto2 { public int Value { get; set; } diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index e5077cc..c7d7ebf 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -11,9 +11,11 @@ using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading.Tasks; +using Cassandra.Mapping; using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.IoTDB; +using JiShe.CollectBus.Mappers; using Volo.Abp; using Volo.Abp.Application; using Volo.Abp.Autofac; @@ -23,6 +25,8 @@ using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.EventBus; using Volo.Abp.Modularity; using Microsoft.Extensions.Options; +using JiShe.CollectBus.Interceptors; +using JiShe.CollectBus.Common.Attributes; namespace JiShe.CollectBus; @@ -49,7 +53,24 @@ public class CollectBusApplicationModule : AbpModule Configure(options => { options.AddMaps(validate: true); - }); + }); + + context.Services.AddSingleton(new MappingConfiguration() + .Define(new CollectBusMapping())); + + // 注册拦截器 + context.Services.OnRegistered(ctx => + { + var methods = ctx.ImplementationType.GetMethods(); + foreach (var method in methods) + { + var attr = method.GetCustomAttribute(typeof(LogInterceptAttribute), true); + if (attr != null) + { + ctx.Interceptors.TryAdd(); + } + } + }); } public override async Task OnApplicationInitializationAsync( @@ -69,10 +90,9 @@ public class CollectBusApplicationModule : AbpModule //初始化主题信息 var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); - var configuration = context.ServiceProvider.GetRequiredService(); var kafkaOptions = context.ServiceProvider.GetRequiredService>(); - List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); + var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); foreach (var item in topics) diff --git a/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs new file mode 100644 index 0000000..a3969aa --- /dev/null +++ b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptAttribute.cs @@ -0,0 +1,9 @@ +using System; + +namespace JiShe.CollectBus.Interceptors +{ + [AttributeUsage(AttributeTargets.Method)] + public class LogInterceptAttribute : Attribute + { + } +} diff --git a/services/JiShe.CollectBus.Application/Interceptors/LogInterceptor.cs b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptor.cs new file mode 100644 index 0000000..a796aaf --- /dev/null +++ b/services/JiShe.CollectBus.Application/Interceptors/LogInterceptor.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; +using Volo.Abp.DynamicProxy; + +namespace JiShe.CollectBus.Interceptors +{ + public class LogInterceptor : AbpInterceptor, ITransientDependency + { + public override async Task InterceptAsync(IAbpMethodInvocation invocation) + { + // 方法执行前的逻辑(如果需要) + + try + { + // 执行原始方法 + await invocation.ProceedAsync(); + + // 方法执行成功后,返回前的逻辑 + await OnSuccessAsync(invocation); + } + catch (Exception ex) + { + // 出现异常时的逻辑 + await OnExceptionAsync(invocation, ex); + throw; + } + finally + { + // 方法结束前一定会执行的逻辑 + await OnCompleteAsync(invocation); + } + } + + private Task OnSuccessAsync(IAbpMethodInvocation invocation) + { + // 方法执行成功后的逻辑 + // 可以访问 invocation.ReturnValue 获取返回值 + Console.WriteLine($"方法 {invocation.Method.Name} 成功执行,返回值:{invocation.ReturnValue}"); + return Task.CompletedTask; + } + + private Task OnExceptionAsync(IAbpMethodInvocation invocation, Exception ex) + { + // 方法执行异常时的逻辑 + Console.WriteLine($"方法 {invocation.Method.Name} 执行异常:{ex.Message}"); + return Task.CompletedTask; + } + + private Task OnCompleteAsync(IAbpMethodInvocation invocation) + { + // 无论成功失败,方法执行完毕后的逻辑 + Console.WriteLine($"方法 {invocation.Method.Name} 执行完毕"); + return Task.CompletedTask; + } + } +} diff --git a/modules/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs b/services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs similarity index 61% rename from modules/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs rename to services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs index d07f8ea..a4d4ef4 100644 --- a/modules/JiShe.CollectBus.Cassandra/Mappers/CollectBusMapping.cs +++ b/services/JiShe.CollectBus.Application/Mappers/CollectBusMapping.cs @@ -1,13 +1,8 @@ using Cassandra.Mapping; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; -using static Cassandra.QueryTrace; -namespace JiShe.CollectBus.Cassandra.Mappers +namespace JiShe.CollectBus.Mappers { public class CollectBusMapping: Mappings { @@ -15,6 +10,8 @@ namespace JiShe.CollectBus.Cassandra.Mappers { For() .Column(e => e.Type, cm => cm.WithName("type").WithDbType()); + For() + .Column(e => e.Status, cm => cm.WithName("status").WithDbType()); } } } diff --git a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs index 2de458f..68d367c 100644 --- a/services/JiShe.CollectBus.Application/Samples/TestAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/TestAppService.cs @@ -25,6 +25,7 @@ using Volo.Abp.Domain.Repositories; using System.Diagnostics; using System.Linq; using Cassandra; +using JiShe.CollectBus.Interceptors; namespace JiShe.CollectBus.Samples; @@ -35,11 +36,10 @@ public class TestAppService : CollectBusAppService private readonly ICassandraRepository _messageReceivedCassandraRepository; private readonly ICassandraProvider _cassandraProvider; - - public TestAppService( ILogger logger, - ICassandraRepository messageReceivedCassandraRepository, ICassandraProvider cassandraProvider) + ICassandraRepository messageReceivedCassandraRepository, + ICassandraProvider cassandraProvider) { _logger = logger; _messageReceivedCassandraRepository = messageReceivedCassandraRepository; @@ -122,4 +122,11 @@ public class TestAppService : CollectBusAppService // 等待所有批处理完成 await Task.WhenAll(tasks); } + + [LogIntercept] + public async Task LogInterceptorTest(string str) + { + _logger.LogWarning(str); + return str; + } } diff --git a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 4236855..e10d99b 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -29,8 +29,6 @@ namespace JiShe.CollectBus.Subscribers private readonly IServiceProvider _serviceProvider; private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; - private readonly IRepository _messageReceivedEventRepository; - private readonly IRepository _deviceRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IIoTDBProvider _dbProvider; @@ -42,15 +40,12 @@ namespace JiShe.CollectBus.Subscribers /// The service provider. /// The message received login event repository. /// The message received heartbeat event repository. - /// The message received event repository. - /// The device repository. /// The device repository. public SubscriberAppService(ILogger logger, - ITcpService tcpService, IServiceProvider serviceProvider, + ITcpService tcpService, + IServiceProvider serviceProvider, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository, - IRepository messageReceivedEventRepository, - IRepository deviceRepository, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordsRepository) { @@ -59,8 +54,6 @@ namespace JiShe.CollectBus.Subscribers _serviceProvider = serviceProvider; _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; - _messageReceivedEventRepository = messageReceivedEventRepository; - _deviceRepository = deviceRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository; _dbProvider = dbProvider; } diff --git a/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs b/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs index 12f3e37..925692e 100644 --- a/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs +++ b/services/JiShe.CollectBus.Application/Workers/EpiCollectWorker.cs @@ -10,7 +10,6 @@ using Volo.Abp.Uow; namespace JiShe.CollectBus.Workers { - [IgnoreJob] public class EpiCollectWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker { private readonly ILogger _logger; diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs b/services/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs index 91a5ce7..8861c70 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/Devices/Device.cs @@ -1,14 +1,18 @@ using System; using System.Collections.Generic; +using System.ComponentModel.DataAnnotations; using System.Linq; using System.Text; using System.Threading.Tasks; +using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Enums; +using Volo.Abp.Auditing; using Volo.Abp.Domain.Entities; +using Volo.Abp.Logging; namespace JiShe.CollectBus.IotSystems.Devices { - public class Device : AggregateRoot + public class Device : BasicAggregateRoot { /// /// Device @@ -20,6 +24,7 @@ namespace JiShe.CollectBus.IotSystems.Devices /// public Device(string number, string clientId, DateTime firstOnlineTime, DateTime lastOnlineTime, DeviceStatus status) { + Id = Guid.NewGuid(); Number = number; FirstOnlineTime = firstOnlineTime; LastOnlineTime = lastOnlineTime; @@ -30,6 +35,7 @@ namespace JiShe.CollectBus.IotSystems.Devices /// /// 集中器编号,在集中器登录时解析获取,并会更新为当前TCP连接的最新ClientId /// + [PartitionKey] public string Number { get; set; } /// @@ -55,6 +61,7 @@ namespace JiShe.CollectBus.IotSystems.Devices /// /// 设备状态 /// + [PartitionKey] public DeviceStatus Status { get; set; } /// diff --git a/services/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs b/services/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs index 072abcb..09294c1 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/MessageIssueds/MessageIssued.cs @@ -11,19 +11,14 @@ using Volo.Abp.Domain.Entities; namespace JiShe.CollectBus.IotSystems.MessageIssueds { [CassandraTable] - public class MessageIssued:IEntity + public class MessageIssued: ICassandraEntity { public string ClientId { get; set; } public byte[] Message { get; set; } public string DeviceNo { get; set; } public IssuedEventType Type { get; set; } public string MessageId { get; set; } - [Key] + [PartitionKey] public string Id { get; set; } - - public object?[] GetKeys() - { - return new object[] { Id }; - } } } diff --git a/shared/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs b/shared/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs index 5ee6d8f..72db97c 100644 --- a/shared/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs +++ b/shared/JiShe.CollectBus.Common/Attributes/CassandraTableAttribute.cs @@ -21,4 +21,20 @@ namespace JiShe.CollectBus.Common.Attributes { } } + + [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)] + public class PartitionKeyAttribute : Attribute + { + public PartitionKeyAttribute() + { + } + } + + [AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)] + public class ClusteringKeyAttribute : Attribute + { + public ClusteringKeyAttribute() + { + } + } } diff --git a/shared/JiShe.CollectBus.Common/Attributes/IgnoreJobAttribute.cs b/shared/JiShe.CollectBus.Common/Attributes/IgnoreJobAttribute.cs deleted file mode 100644 index 464d1ee..0000000 --- a/shared/JiShe.CollectBus.Common/Attributes/IgnoreJobAttribute.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Common.Attributes -{ - [AttributeUsage(AttributeTargets.Class, Inherited = false)] - public class IgnoreJobAttribute : Attribute - { - } -} diff --git a/shared/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs b/shared/JiShe.CollectBus.Common/Attributes/NumericalOrderAttribute.cs similarity index 92% rename from shared/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs rename to shared/JiShe.CollectBus.Common/Attributes/NumericalOrderAttribute.cs index a062f16..b337ae1 100644 --- a/shared/JiShe.CollectBus.Common/AttributeInfo/NumericalOrderAttribute.cs +++ b/shared/JiShe.CollectBus.Common/Attributes/NumericalOrderAttribute.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; -namespace JiShe.CollectBus.Common.AttributeInfo +namespace JiShe.CollectBus.Common.Attributes { /// /// 排序序号 diff --git a/shared/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs b/shared/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs deleted file mode 100644 index 7e404a5..0000000 --- a/shared/JiShe.CollectBus.Common/Attributes/TopicNameAttribute.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Volo.Abp.EventBus; -using Volo.Abp; - -namespace JiShe.CollectBus.Common.Attributes -{ - [AttributeUsage(AttributeTargets.Class, Inherited = false)] - public class TopicNameAttribute : Attribute - { - public virtual string Name { get; } - - public TopicNameAttribute(string name) - { - this.Name = Check.NotNullOrWhiteSpace(name, nameof(name)); - } - - public string GetName(Type eventType) => this.Name; - } -} diff --git a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 22ebef5..2474b88 100644 --- a/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/shared/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -8,7 +8,7 @@ using System.Runtime.InteropServices; using System.Security.Cryptography; using System.Text; using System.Threading.Tasks; -using JiShe.CollectBus.Common.AttributeInfo; +using JiShe.CollectBus.Common.Attributes; namespace JiShe.CollectBus.Common.Helpers { diff --git a/shared/JiShe.CollectBus.Domain.Shared/CassandraBaseEntity.cs b/shared/JiShe.CollectBus.Domain.Shared/CassandraBaseEntity.cs new file mode 100644 index 0000000..0e7fc58 --- /dev/null +++ b/shared/JiShe.CollectBus.Domain.Shared/CassandraBaseEntity.cs @@ -0,0 +1,14 @@ +using System; +using System.ComponentModel.DataAnnotations; +using JiShe.CollectBus.Common.Attributes; + +namespace JiShe.CollectBus +{ + public class CassandraBaseEntity: ICassandraEntity + { + /// + /// Id + /// + public TKey Id { get; set; } + } +} diff --git a/shared/JiShe.CollectBus.Domain.Shared/ICassandraEntity.cs b/shared/JiShe.CollectBus.Domain.Shared/ICassandraEntity.cs new file mode 100644 index 0000000..760dd7a --- /dev/null +++ b/shared/JiShe.CollectBus.Domain.Shared/ICassandraEntity.cs @@ -0,0 +1,19 @@ +using System; + +namespace JiShe.CollectBus +{ + public interface ICassandraEntity + { + TKey Id { get; set; } + } + + public interface IHasCreationTime + { + DateTime CreationTime { get; set; } + } + + public interface IHasLastModificationTime + { + DateTime? LastModificationTime { get; set; } + } +} diff --git a/shared/JiShe.CollectBus.Domain.Shared/JiShe.CollectBus.Domain.Shared.csproj b/shared/JiShe.CollectBus.Domain.Shared/JiShe.CollectBus.Domain.Shared.csproj index a2747f4..874b688 100644 --- a/shared/JiShe.CollectBus.Domain.Shared/JiShe.CollectBus.Domain.Shared.csproj +++ b/shared/JiShe.CollectBus.Domain.Shared/JiShe.CollectBus.Domain.Shared.csproj @@ -31,4 +31,8 @@ + + + + diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index f776202..eeacc1b 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -84,7 +84,7 @@ "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, "NumPartitions": 30, - "ServerTagName": "JiSheCollectBus2" + "ServerTagName": "JiSheCollectBus20" }, "IoTDBOptions": { "UserName": "root", @@ -94,8 +94,7 @@ "DataBaseName": "energy", "OpenDebugMode": true, "UseTableSessionPoolByDefault": false - }, - "ServerTagName": "JiSheCollectBus3", + }, "Cassandra": { "ReplicationStrategy": { "Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下