Compare commits

...

2 Commits

Author SHA1 Message Date
cli
01aeaebb0a 合并 2025-04-21 09:56:22 +08:00
cli
640fcb754d 添加拦截器 2025-04-21 09:54:34 +08:00
26 changed files with 220 additions and 262 deletions

View File

@ -1,156 +0,0 @@
using System.Collections.Concurrent;
using Cassandra;
using Cassandra.Mapping;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.Cassandra
{
public class CassandraQueryOptimizer
{
private readonly ISession _session;
private readonly ILogger<CassandraQueryOptimizer> _logger;
private readonly IMemoryCache _cache;
private readonly ConcurrentDictionary<string, PreparedStatement> _preparedStatements;
private readonly int _batchSize;
private readonly TimeSpan _cacheExpiration;
public CassandraQueryOptimizer(
ISession session,
ILogger<CassandraQueryOptimizer> logger,
IMemoryCache cache,
int batchSize = 100,
TimeSpan? cacheExpiration = null)
{
_session = session;
_logger = logger;
_cache = cache;
_preparedStatements = new ConcurrentDictionary<string, PreparedStatement>();
_batchSize = batchSize;
_cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5);
}
public async Task<PreparedStatement> GetOrPrepareStatementAsync(string cql)
{
return _preparedStatements.GetOrAdd(cql, key =>
{
try
{
var statement = _session.Prepare(key);
_logger.LogDebug($"Prepared statement for CQL: {key}");
return statement;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to prepare statement for CQL: {key}");
throw;
}
});
}
public async Task ExecuteBatchAsync(IEnumerable<BoundStatement> statements)
{
var batch = new BatchStatement();
var count = 0;
foreach (var statement in statements)
{
batch.Add(statement);
count++;
if (count >= _batchSize)
{
await ExecuteBatchAsync(batch);
batch = new BatchStatement();
count = 0;
}
}
if (count > 0)
{
await ExecuteBatchAsync(batch);
}
}
private async Task ExecuteBatchAsync(BatchStatement batch)
{
try
{
await _session.ExecuteAsync(batch);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to execute batch statement");
throw;
}
}
public async Task<T> GetOrSetFromCacheAsync<T>(string cacheKey, Func<Task<T>> getData)
{
if (_cache.TryGetValue(cacheKey, out T cachedValue))
{
_logger.LogDebug($"Cache hit for key: {cacheKey}");
return cachedValue;
}
var data = await getData();
_cache.Set(cacheKey, data, _cacheExpiration);
_logger.LogDebug($"Cache miss for key: {cacheKey}, data cached");
return data;
}
public async Task<IEnumerable<T>> ExecutePagedQueryAsync<T>(
string cql,
object[] parameters,
int pageSize = 100,
string pagingState = null) where T : class
{
var statement = await GetOrPrepareStatementAsync(cql);
var boundStatement = statement.Bind(parameters);
if (!string.IsNullOrEmpty(pagingState))
{
boundStatement.SetPagingState(Convert.FromBase64String(pagingState));
}
boundStatement.SetPageSize(pageSize);
try
{
var result = await _session.ExecuteAsync(boundStatement);
//TODO: RETURN OBJECT
throw new NotImplementedException();
//result.GetRows()
//return result.Select(row => row);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to execute paged query: {cql}");
throw;
}
}
public async Task BulkInsertAsync<T>(IEnumerable<T> items, string tableName)
{
var mapper = new Mapper(_session);
var batch = new List<BoundStatement>();
var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})";
foreach (var chunk in items.Chunk(_batchSize))
{
var statements = chunk.Select(item =>
{
var props = typeof(T).GetProperties();
var columns = string.Join(", ", props.Select(p => p.Name));
var values = string.Join(", ", props.Select(p => "?"));
var statement = _session.Prepare(string.Format(cql, columns, values));
return statement.Bind(props.Select(p => p.GetValue(item)).ToArray());
});
batch.AddRange(statements);
}
await ExecuteBatchAsync(batch);
}
}
}

View File

@ -1,19 +1,13 @@
using Cassandra; using System.Linq.Expressions;
using Cassandra.Data.Linq;
using Cassandra.Mapping; using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra.Extensions; using JiShe.CollectBus.Cassandra.Extensions;
using JiShe.CollectBus.Common.Attributes;
using Microsoft.AspNetCore.Http;
using System.Reflection;
using Thrift.Protocol.Entities;
using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Cassandra namespace JiShe.CollectBus.Cassandra
{ {
public class CassandraRepository<TEntity, TKey> public class CassandraRepository<TEntity, TKey>
: ICassandraRepository<TEntity, TKey> : ICassandraRepository<TEntity, TKey>
where TEntity : class where TEntity : class, ICassandraEntity<TKey>
{ {
private readonly ICassandraProvider _cassandraProvider; private readonly ICassandraProvider _cassandraProvider;
public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig) public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig)
@ -27,12 +21,29 @@ namespace JiShe.CollectBus.Cassandra
public virtual async Task<TEntity> GetAsync(TKey id) public virtual async Task<TEntity> GetAsync(TKey id)
{ {
return await Mapper.SingleOrDefaultAsync<TEntity>("WHERE id = ?", id); return await GetAsync("WHERE id = ?", id);
} }
public virtual async Task<List<TEntity>> GetListAsync() public virtual async Task<TEntity?> GetAsync(string cql, params object[] args)
{ {
return (await Mapper.FetchAsync<TEntity>()).ToList(); return await Mapper.SingleAsync<TEntity?>(cql, args);
}
public virtual async Task<TEntity> FirstOrDefaultAsync(TKey id)
{
return await FirstOrDefaultAsync("WHERE id = ?", id);
}
public virtual async Task<TEntity?> FirstOrDefaultAsync(string cql, params object[] args)
{
return await Mapper.FirstOrDefaultAsync<TEntity>(cql, args);
}
public virtual async Task<List<TEntity>?> GetListAsync(string? cql = null, params object[] args)
{
return cql.IsNullOrWhiteSpace() ? (await Mapper.FetchAsync<TEntity>()).ToList() : (await Mapper.FetchAsync<TEntity>(cql, args)).ToList();
} }
public virtual async Task<TEntity> InsertAsync(TEntity entity) public virtual async Task<TEntity> InsertAsync(TEntity entity)

View File

@ -1,7 +1,4 @@
using Cassandra; using Microsoft.Extensions.DependencyInjection;
using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra.Mappers;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.Autofac; using Volo.Abp.Autofac;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;

View File

@ -1,10 +1,4 @@
using Autofac.Core; using JiShe.CollectBus.Cassandra;
using Cassandra;
using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.Cassandra.Mappers;
using Microsoft.Extensions.Options;
using System.Reflection;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
@ -26,8 +20,6 @@ namespace Microsoft.Extensions.DependencyInjection
public static void AddCassandra(this ServiceConfigurationContext context) public static void AddCassandra(this ServiceConfigurationContext context)
{ {
context.Services.AddTransient(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>)); context.Services.AddTransient(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>));
context.Services.AddSingleton(new MappingConfiguration()
.Define(new CollectBusMapping()));
} }
} }
} }

View File

@ -3,9 +3,7 @@ using System.Text;
using Cassandra; using Cassandra;
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
using JiShe.CollectBus.Common.Attributes; using JiShe.CollectBus.Common.Attributes;
using Cassandra.Mapping; using Volo.Abp.Data;
using Cassandra.Data.Linq;
using Thrift.Protocol.Entities;
namespace JiShe.CollectBus.Cassandra.Extensions namespace JiShe.CollectBus.Cassandra.Extensions
{ {
@ -16,17 +14,26 @@ namespace JiShe.CollectBus.Cassandra.Extensions
var type = typeof(TEntity); var type = typeof(TEntity);
var tableAttribute = type.GetCustomAttribute<CassandraTableAttribute>(); var tableAttribute = type.GetCustomAttribute<CassandraTableAttribute>();
var tableName = tableAttribute?.Name ?? type.Name.ToLower(); var tableName = tableAttribute?.Name ?? type.Name.ToLower();
//var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace;
var tableKeyspace = session.Keyspace; var tableKeyspace = session.Keyspace;
var properties = type.GetProperties(); var properties = type.GetProperties();
var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute<KeyAttribute>() != null);
// 分区键设置
var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute<PartitionKeyAttribute>() != null);
if (primaryKey == null) if (primaryKey == null)
{ {
throw new InvalidOperationException($"No primary key defined for type {type.Name}"); throw new InvalidOperationException($"No primary key defined for type {type.Name}");
} }
// 集群键设置
var clusteringKeys = properties.Where(p => p.GetCustomAttribute<ClusteringKeyAttribute>() != null).Select(a=>a.Name).ToList();
var clusteringKeyCql = string.Empty;
if (clusteringKeys.Any())
{
clusteringKeyCql = $", {string.Join(", ", clusteringKeys)}";
}
var cql = new StringBuilder(); var cql = new StringBuilder();
cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} ("); cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} (");
@ -40,7 +47,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions
cql.Append($"{columnName} {cqlType}, "); cql.Append($"{columnName} {cqlType}, ");
} }
cql.Length -= 2; // Remove last comma and space cql.Length -= 2; // Remove last comma and space
cql.Append($", PRIMARY KEY ({primaryKey.Name.ToLower()}))"); cql.Append($", PRIMARY KEY (({primaryKey.Name.ToLower()}){clusteringKeyCql}))");
session.Execute(cql.ToString()); session.Execute(cql.ToString());
} }
@ -61,6 +68,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions
if (type == typeof(Guid)) return "uuid"; if (type == typeof(Guid)) return "uuid";
if (type == typeof(DateTimeOffset)) return "timestamp"; if (type == typeof(DateTimeOffset)) return "timestamp";
if (type == typeof(Byte[])) return "blob"; if (type == typeof(Byte[])) return "blob";
if (type == typeof(ExtraPropertyDictionary)) return "map<text,text>";
// 处理集合类型 // 处理集合类型
if (type.IsGenericType) if (type.IsGenericType)
@ -72,6 +80,8 @@ namespace JiShe.CollectBus.Cassandra.Extensions
return $"list<{GetCassandraType(elementType)}>"; return $"list<{GetCassandraType(elementType)}>";
if (genericType == typeof(HashSet<>)) if (genericType == typeof(HashSet<>))
return $"set<{GetCassandraType(elementType)}>"; return $"set<{GetCassandraType(elementType)}>";
if (genericType == typeof(Nullable<>))
return GetCassandraType(elementType);
if (genericType == typeof(Dictionary<,>)) if (genericType == typeof(Dictionary<,>))
{ {
var keyType = type.GetGenericArguments()[0]; var keyType = type.GetGenericArguments()[0];

View File

@ -10,7 +10,10 @@ namespace JiShe.CollectBus.Cassandra
public interface ICassandraRepository<TEntity, TKey> where TEntity : class public interface ICassandraRepository<TEntity, TKey> where TEntity : class
{ {
Task<TEntity> GetAsync(TKey id); Task<TEntity> GetAsync(TKey id);
Task<List<TEntity>> GetListAsync(); Task<TEntity?> GetAsync(string cql, params object[] args);
Task<TEntity> FirstOrDefaultAsync(TKey id);
Task<TEntity?> FirstOrDefaultAsync(string cql, params object[] args);
Task<List<TEntity>?> GetListAsync(string? cql = null, params object[] args);
Task<TEntity> InsertAsync(TEntity entity); Task<TEntity> InsertAsync(TEntity entity);
Task<TEntity> UpdateAsync(TEntity entity); Task<TEntity> UpdateAsync(TEntity entity);
Task DeleteAsync(TEntity entity); Task DeleteAsync(TEntity entity);

View File

@ -15,8 +15,8 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" /> <ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -4,14 +4,12 @@ using Volo.Abp.EventBus;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;
[EventName("Sample.Kafka.Test")] [EventName("Sample.Kafka.Test")]
[TopicName("Test1")]
public class SampleDto public class SampleDto
{ {
public int Value { get; set; } public int Value { get; set; }
} }
[EventName("Sample.Kafka.Test2")] [EventName("Sample.Kafka.Test2")]
[TopicName("Test2")]
public class SampleDto2 public class SampleDto2
{ {
public int Value { get; set; } public int Value { get; set; }

View File

@ -11,9 +11,11 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using Cassandra.Mapping;
using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.Cassandra;
using JiShe.CollectBus.FreeRedis; using JiShe.CollectBus.FreeRedis;
using JiShe.CollectBus.IoTDB; using JiShe.CollectBus.IoTDB;
using JiShe.CollectBus.Mappers;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.Application; using Volo.Abp.Application;
using Volo.Abp.Autofac; using Volo.Abp.Autofac;
@ -24,6 +26,8 @@ using Volo.Abp.EventBus;
using Volo.Abp.Modularity; using Volo.Abp.Modularity;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.Common.Attributes;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;
@ -51,6 +55,23 @@ public class CollectBusApplicationModule : AbpModule
{ {
options.AddMaps<CollectBusApplicationModule>(validate: true); options.AddMaps<CollectBusApplicationModule>(validate: true);
}); });
context.Services.AddSingleton(new MappingConfiguration()
.Define(new CollectBusMapping()));
// 注册拦截器
context.Services.OnRegistered(ctx =>
{
var methods = ctx.ImplementationType.GetMethods();
foreach (var method in methods)
{
var attr = method.GetCustomAttribute(typeof(LogInterceptAttribute), true);
if (attr != null)
{
ctx.Interceptors.TryAdd<LogInterceptor>();
}
}
});
} }
public override async Task OnApplicationInitializationAsync( public override async Task OnApplicationInitializationAsync(
@ -70,10 +91,9 @@ public class CollectBusApplicationModule : AbpModule
//初始化主题信息 //初始化主题信息
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>(); var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
var configuration = context.ServiceProvider.GetRequiredService<IConfiguration>();
var kafkaOptions = context.ServiceProvider.GetRequiredService<IOptions<KafkaOptionConfig>>(); var kafkaOptions = context.ServiceProvider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics) foreach (var item in topics)

View File

@ -0,0 +1,9 @@
using System;
namespace JiShe.CollectBus.Interceptors
{
[AttributeUsage(AttributeTargets.Method)]
public class LogInterceptAttribute : Attribute
{
}
}

View File

@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.DynamicProxy;
namespace JiShe.CollectBus.Interceptors
{
public class LogInterceptor : AbpInterceptor, ITransientDependency
{
public override async Task InterceptAsync(IAbpMethodInvocation invocation)
{
// 方法执行前的逻辑(如果需要)
try
{
// 执行原始方法
await invocation.ProceedAsync();
// 方法执行成功后,返回前的逻辑
await OnSuccessAsync(invocation);
}
catch (Exception ex)
{
// 出现异常时的逻辑
await OnExceptionAsync(invocation, ex);
throw;
}
finally
{
// 方法结束前一定会执行的逻辑
await OnCompleteAsync(invocation);
}
}
private Task OnSuccessAsync(IAbpMethodInvocation invocation)
{
// 方法执行成功后的逻辑
// 可以访问 invocation.ReturnValue 获取返回值
Console.WriteLine($"方法 {invocation.Method.Name} 成功执行,返回值:{invocation.ReturnValue}");
return Task.CompletedTask;
}
private Task OnExceptionAsync(IAbpMethodInvocation invocation, Exception ex)
{
// 方法执行异常时的逻辑
Console.WriteLine($"方法 {invocation.Method.Name} 执行异常:{ex.Message}");
return Task.CompletedTask;
}
private Task OnCompleteAsync(IAbpMethodInvocation invocation)
{
// 无论成功失败,方法执行完毕后的逻辑
Console.WriteLine($"方法 {invocation.Method.Name} 执行完毕");
return Task.CompletedTask;
}
}
}

View File

@ -1,13 +1,8 @@
using Cassandra.Mapping; using Cassandra.Mapping;
using System; using JiShe.CollectBus.IotSystems.Devices;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using static Cassandra.QueryTrace;
namespace JiShe.CollectBus.Cassandra.Mappers namespace JiShe.CollectBus.Mappers
{ {
public class CollectBusMapping: Mappings public class CollectBusMapping: Mappings
{ {
@ -15,6 +10,8 @@ namespace JiShe.CollectBus.Cassandra.Mappers
{ {
For<MessageIssued>() For<MessageIssued>()
.Column(e => e.Type, cm => cm.WithName("type").WithDbType<int>()); .Column(e => e.Type, cm => cm.WithName("type").WithDbType<int>());
For<Device>()
.Column(e => e.Status, cm => cm.WithName("status").WithDbType<int>());
} }
} }
} }

View File

@ -25,6 +25,7 @@ using Volo.Abp.Domain.Repositories;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using Cassandra; using Cassandra;
using JiShe.CollectBus.Interceptors;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;
@ -35,11 +36,10 @@ public class TestAppService : CollectBusAppService
private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository; private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository;
private readonly ICassandraProvider _cassandraProvider; private readonly ICassandraProvider _cassandraProvider;
public TestAppService( public TestAppService(
ILogger<TestAppService> logger, ILogger<TestAppService> logger,
ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository, ICassandraProvider cassandraProvider) ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository,
ICassandraProvider cassandraProvider)
{ {
_logger = logger; _logger = logger;
_messageReceivedCassandraRepository = messageReceivedCassandraRepository; _messageReceivedCassandraRepository = messageReceivedCassandraRepository;
@ -122,4 +122,11 @@ public class TestAppService : CollectBusAppService
// 等待所有批处理完成 // 等待所有批处理完成
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
} }
[LogIntercept]
public async Task<string> LogInterceptorTest(string str)
{
_logger.LogWarning(str);
return str;
}
} }

View File

@ -30,8 +30,6 @@ namespace JiShe.CollectBus.Subscribers
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly IRepository<MessageReceivedLogin, Guid> _messageReceivedLoginEventRepository; private readonly IRepository<MessageReceivedLogin, Guid> _messageReceivedLoginEventRepository;
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository; private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
private readonly IIoTDBProvider _dbProvider; private readonly IIoTDBProvider _dbProvider;
@ -43,15 +41,12 @@ namespace JiShe.CollectBus.Subscribers
/// <param name="serviceProvider">The service provider.</param> /// <param name="serviceProvider">The service provider.</param>
/// <param name="messageReceivedLoginEventRepository">The message received login event repository.</param> /// <param name="messageReceivedLoginEventRepository">The message received login event repository.</param>
/// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param> /// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param>
/// <param name="messageReceivedEventRepository">The message received event repository.</param>
/// <param name="deviceRepository">The device repository.</param>
/// <param name="meterReadingRecordsRepository">The device repository.</param> /// <param name="meterReadingRecordsRepository">The device repository.</param>
public SubscriberAppService(ILogger<SubscriberAppService> logger, public SubscriberAppService(ILogger<SubscriberAppService> logger,
ITcpService tcpService, IServiceProvider serviceProvider, ITcpService tcpService,
IServiceProvider serviceProvider,
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository, IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository, IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
IRepository<MessageReceived, Guid> messageReceivedEventRepository,
IRepository<Device, Guid> deviceRepository,
IIoTDBProvider dbProvider, IIoTDBProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordsRepository) IMeterReadingRecordRepository meterReadingRecordsRepository)
{ {
@ -60,8 +55,6 @@ namespace JiShe.CollectBus.Subscribers
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
_messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository;
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
_messageReceivedEventRepository = messageReceivedEventRepository;
_deviceRepository = deviceRepository;
_meterReadingRecordsRepository = meterReadingRecordsRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository;
_dbProvider = dbProvider; _dbProvider = dbProvider;
} }

View File

@ -10,7 +10,6 @@ using Volo.Abp.Uow;
namespace JiShe.CollectBus.Workers namespace JiShe.CollectBus.Workers
{ {
[IgnoreJob]
public class EpiCollectWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker public class EpiCollectWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker
{ {
private readonly ILogger<EpiCollectWorker> _logger; private readonly ILogger<EpiCollectWorker> _logger;

View File

@ -1,14 +1,18 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.Enums; using JiShe.CollectBus.Enums;
using Volo.Abp.Auditing;
using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Entities;
using Volo.Abp.Logging;
namespace JiShe.CollectBus.IotSystems.Devices namespace JiShe.CollectBus.IotSystems.Devices
{ {
public class Device : AggregateRoot<Guid> public class Device : BasicAggregateRoot<Guid>
{ {
/// <summary> /// <summary>
/// Device /// Device
@ -20,6 +24,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// <param name="status"></param> /// <param name="status"></param>
public Device(string number, string clientId, DateTime firstOnlineTime, DateTime lastOnlineTime, DeviceStatus status) public Device(string number, string clientId, DateTime firstOnlineTime, DateTime lastOnlineTime, DeviceStatus status)
{ {
Id = Guid.NewGuid();
Number = number; Number = number;
FirstOnlineTime = firstOnlineTime; FirstOnlineTime = firstOnlineTime;
LastOnlineTime = lastOnlineTime; LastOnlineTime = lastOnlineTime;
@ -30,6 +35,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// <summary> /// <summary>
/// 集中器编号在集中器登录时解析获取并会更新为当前TCP连接的最新ClientId /// 集中器编号在集中器登录时解析获取并会更新为当前TCP连接的最新ClientId
/// </summary> /// </summary>
[PartitionKey]
public string Number { get; set; } public string Number { get; set; }
/// <summary> /// <summary>
@ -55,6 +61,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
/// <summary> /// <summary>
/// 设备状态 /// 设备状态
/// </summary> /// </summary>
[PartitionKey]
public DeviceStatus Status { get; set; } public DeviceStatus Status { get; set; }
/// <summary> /// <summary>

View File

@ -11,19 +11,14 @@ using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IotSystems.MessageIssueds namespace JiShe.CollectBus.IotSystems.MessageIssueds
{ {
[CassandraTable] [CassandraTable]
public class MessageIssued:IEntity<string> public class MessageIssued: ICassandraEntity<string>
{ {
public string ClientId { get; set; } public string ClientId { get; set; }
public byte[] Message { get; set; } public byte[] Message { get; set; }
public string DeviceNo { get; set; } public string DeviceNo { get; set; }
public IssuedEventType Type { get; set; } public IssuedEventType Type { get; set; }
public string MessageId { get; set; } public string MessageId { get; set; }
[Key] [PartitionKey]
public string Id { get; set; } public string Id { get; set; }
public object?[] GetKeys()
{
return new object[] { Id };
}
} }
} }

View File

@ -21,4 +21,20 @@ namespace JiShe.CollectBus.Common.Attributes
{ {
} }
} }
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)]
public class PartitionKeyAttribute : Attribute
{
public PartitionKeyAttribute()
{
}
}
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)]
public class ClusteringKeyAttribute : Attribute
{
public ClusteringKeyAttribute()
{
}
}
} }

View File

@ -1,13 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Attributes
{
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class IgnoreJobAttribute : Attribute
{
}
}

View File

@ -4,7 +4,7 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.AttributeInfo namespace JiShe.CollectBus.Common.Attributes
{ {
/// <summary> /// <summary>
/// 排序序号 /// 排序序号

View File

@ -1,23 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.EventBus;
using Volo.Abp;
namespace JiShe.CollectBus.Common.Attributes
{
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class TopicNameAttribute : Attribute
{
public virtual string Name { get; }
public TopicNameAttribute(string name)
{
this.Name = Check.NotNullOrWhiteSpace(name, nameof(name));
}
public string GetName(Type eventType) => this.Name;
}
}

View File

@ -8,7 +8,7 @@ using System.Runtime.InteropServices;
using System.Security.Cryptography; using System.Security.Cryptography;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.Common.AttributeInfo; using JiShe.CollectBus.Common.Attributes;
namespace JiShe.CollectBus.Common.Helpers namespace JiShe.CollectBus.Common.Helpers
{ {

View File

@ -0,0 +1,14 @@
using System;
using System.ComponentModel.DataAnnotations;
using JiShe.CollectBus.Common.Attributes;
namespace JiShe.CollectBus
{
public class CassandraBaseEntity<TKey>: ICassandraEntity<TKey>
{
/// <summary>
/// Id
/// </summary>
public TKey Id { get; set; }
}
}

View File

@ -0,0 +1,19 @@
using System;
namespace JiShe.CollectBus
{
public interface ICassandraEntity<TKey>
{
TKey Id { get; set; }
}
public interface IHasCreationTime
{
DateTime CreationTime { get; set; }
}
public interface IHasLastModificationTime
{
DateTime? LastModificationTime { get; set; }
}
}

View File

@ -31,4 +31,8 @@
<None Remove="JiShe.CollectBus.Domain.Shared.abppkg" /> <None Remove="JiShe.CollectBus.Domain.Shared.abppkg" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
</ItemGroup>
</Project> </Project>

View File

@ -84,7 +84,7 @@
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30, "NumPartitions": 30,
"ServerTagName": "JiSheCollectBus2" "ServerTagName": "JiSheCollectBus20"
}, },
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
@ -95,7 +95,6 @@
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus3",
"Cassandra": { "Cassandra": {
"ReplicationStrategy": { "ReplicationStrategy": {
"Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy "Class": "NetworkTopologyStrategy", //NetworkTopologyStrategySimpleStrategy