添加拦截器
This commit is contained in:
parent
dc7416bdbf
commit
640fcb754d
@ -1,156 +0,0 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Cassandra;
|
||||
using Cassandra.Mapping;
|
||||
using Microsoft.Extensions.Caching.Memory;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace JiShe.CollectBus.Cassandra
|
||||
{
|
||||
public class CassandraQueryOptimizer
|
||||
{
|
||||
private readonly ISession _session;
|
||||
private readonly ILogger<CassandraQueryOptimizer> _logger;
|
||||
private readonly IMemoryCache _cache;
|
||||
private readonly ConcurrentDictionary<string, PreparedStatement> _preparedStatements;
|
||||
private readonly int _batchSize;
|
||||
private readonly TimeSpan _cacheExpiration;
|
||||
|
||||
public CassandraQueryOptimizer(
|
||||
ISession session,
|
||||
ILogger<CassandraQueryOptimizer> logger,
|
||||
IMemoryCache cache,
|
||||
int batchSize = 100,
|
||||
TimeSpan? cacheExpiration = null)
|
||||
{
|
||||
_session = session;
|
||||
_logger = logger;
|
||||
_cache = cache;
|
||||
_preparedStatements = new ConcurrentDictionary<string, PreparedStatement>();
|
||||
_batchSize = batchSize;
|
||||
_cacheExpiration = cacheExpiration ?? TimeSpan.FromMinutes(5);
|
||||
}
|
||||
|
||||
public async Task<PreparedStatement> GetOrPrepareStatementAsync(string cql)
|
||||
{
|
||||
return _preparedStatements.GetOrAdd(cql, key =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var statement = _session.Prepare(key);
|
||||
_logger.LogDebug($"Prepared statement for CQL: {key}");
|
||||
return statement;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, $"Failed to prepare statement for CQL: {key}");
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public async Task ExecuteBatchAsync(IEnumerable<BoundStatement> statements)
|
||||
{
|
||||
var batch = new BatchStatement();
|
||||
var count = 0;
|
||||
|
||||
foreach (var statement in statements)
|
||||
{
|
||||
batch.Add(statement);
|
||||
count++;
|
||||
|
||||
if (count >= _batchSize)
|
||||
{
|
||||
await ExecuteBatchAsync(batch);
|
||||
batch = new BatchStatement();
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (count > 0)
|
||||
{
|
||||
await ExecuteBatchAsync(batch);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ExecuteBatchAsync(BatchStatement batch)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _session.ExecuteAsync(batch);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to execute batch statement");
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<T> GetOrSetFromCacheAsync<T>(string cacheKey, Func<Task<T>> getData)
|
||||
{
|
||||
if (_cache.TryGetValue(cacheKey, out T cachedValue))
|
||||
{
|
||||
_logger.LogDebug($"Cache hit for key: {cacheKey}");
|
||||
return cachedValue;
|
||||
}
|
||||
|
||||
var data = await getData();
|
||||
_cache.Set(cacheKey, data, _cacheExpiration);
|
||||
_logger.LogDebug($"Cache miss for key: {cacheKey}, data cached");
|
||||
return data;
|
||||
}
|
||||
|
||||
public async Task<IEnumerable<T>> ExecutePagedQueryAsync<T>(
|
||||
string cql,
|
||||
object[] parameters,
|
||||
int pageSize = 100,
|
||||
string pagingState = null) where T : class
|
||||
{
|
||||
var statement = await GetOrPrepareStatementAsync(cql);
|
||||
var boundStatement = statement.Bind(parameters);
|
||||
|
||||
if (!string.IsNullOrEmpty(pagingState))
|
||||
{
|
||||
boundStatement.SetPagingState(Convert.FromBase64String(pagingState));
|
||||
}
|
||||
|
||||
boundStatement.SetPageSize(pageSize);
|
||||
|
||||
try
|
||||
{
|
||||
var result = await _session.ExecuteAsync(boundStatement);
|
||||
//TODO: RETURN OBJECT
|
||||
throw new NotImplementedException();
|
||||
//result.GetRows()
|
||||
//return result.Select(row => row);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, $"Failed to execute paged query: {cql}");
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task BulkInsertAsync<T>(IEnumerable<T> items, string tableName)
|
||||
{
|
||||
var mapper = new Mapper(_session);
|
||||
var batch = new List<BoundStatement>();
|
||||
var cql = $"INSERT INTO {tableName} ({{0}}) VALUES ({{1}})";
|
||||
|
||||
foreach (var chunk in items.Chunk(_batchSize))
|
||||
{
|
||||
var statements = chunk.Select(item =>
|
||||
{
|
||||
var props = typeof(T).GetProperties();
|
||||
var columns = string.Join(", ", props.Select(p => p.Name));
|
||||
var values = string.Join(", ", props.Select(p => "?"));
|
||||
var statement = _session.Prepare(string.Format(cql, columns, values));
|
||||
return statement.Bind(props.Select(p => p.GetValue(item)).ToArray());
|
||||
});
|
||||
|
||||
batch.AddRange(statements);
|
||||
}
|
||||
|
||||
await ExecuteBatchAsync(batch);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,19 +1,13 @@
|
||||
using Cassandra;
|
||||
using Cassandra.Data.Linq;
|
||||
using System.Linq.Expressions;
|
||||
using Cassandra.Mapping;
|
||||
using JiShe.CollectBus.Cassandra.Extensions;
|
||||
using JiShe.CollectBus.Common.Attributes;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using System.Reflection;
|
||||
using Thrift.Protocol.Entities;
|
||||
using Volo.Abp.Domain.Entities;
|
||||
using Volo.Abp.Domain.Repositories;
|
||||
|
||||
namespace JiShe.CollectBus.Cassandra
|
||||
{
|
||||
public class CassandraRepository<TEntity, TKey>
|
||||
: ICassandraRepository<TEntity, TKey>
|
||||
where TEntity : class
|
||||
where TEntity : class, ICassandraEntity<TKey>
|
||||
{
|
||||
private readonly ICassandraProvider _cassandraProvider;
|
||||
public CassandraRepository(ICassandraProvider cassandraProvider, MappingConfiguration mappingConfig)
|
||||
@ -27,12 +21,29 @@ namespace JiShe.CollectBus.Cassandra
|
||||
|
||||
public virtual async Task<TEntity> GetAsync(TKey id)
|
||||
{
|
||||
return await Mapper.SingleOrDefaultAsync<TEntity>("WHERE id = ?", id);
|
||||
return await GetAsync("WHERE id = ?", id);
|
||||
}
|
||||
|
||||
public virtual async Task<List<TEntity>> GetListAsync()
|
||||
public virtual async Task<TEntity?> GetAsync(string cql, params object[] args)
|
||||
{
|
||||
return (await Mapper.FetchAsync<TEntity>()).ToList();
|
||||
return await Mapper.SingleAsync<TEntity?>(cql, args);
|
||||
}
|
||||
|
||||
|
||||
public virtual async Task<TEntity> FirstOrDefaultAsync(TKey id)
|
||||
{
|
||||
return await FirstOrDefaultAsync("WHERE id = ?", id);
|
||||
}
|
||||
|
||||
public virtual async Task<TEntity?> FirstOrDefaultAsync(string cql, params object[] args)
|
||||
{
|
||||
return await Mapper.FirstOrDefaultAsync<TEntity>(cql, args);
|
||||
}
|
||||
|
||||
|
||||
public virtual async Task<List<TEntity>?> GetListAsync(string? cql = null, params object[] args)
|
||||
{
|
||||
return cql.IsNullOrWhiteSpace() ? (await Mapper.FetchAsync<TEntity>()).ToList() : (await Mapper.FetchAsync<TEntity>(cql, args)).ToList();
|
||||
}
|
||||
|
||||
public virtual async Task<TEntity> InsertAsync(TEntity entity)
|
||||
|
||||
@ -1,7 +1,4 @@
|
||||
using Cassandra;
|
||||
using Cassandra.Mapping;
|
||||
using JiShe.CollectBus.Cassandra.Mappers;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Volo.Abp;
|
||||
using Volo.Abp.Autofac;
|
||||
using Volo.Abp.Modularity;
|
||||
|
||||
@ -1,10 +1,4 @@
|
||||
using Autofac.Core;
|
||||
using Cassandra;
|
||||
using Cassandra.Mapping;
|
||||
using JiShe.CollectBus.Cassandra;
|
||||
using JiShe.CollectBus.Cassandra.Mappers;
|
||||
using Microsoft.Extensions.Options;
|
||||
using System.Reflection;
|
||||
using JiShe.CollectBus.Cassandra;
|
||||
using Volo.Abp;
|
||||
using Volo.Abp.Modularity;
|
||||
|
||||
@ -26,8 +20,6 @@ namespace Microsoft.Extensions.DependencyInjection
|
||||
public static void AddCassandra(this ServiceConfigurationContext context)
|
||||
{
|
||||
context.Services.AddTransient(typeof(ICassandraRepository<,>), typeof(CassandraRepository<,>));
|
||||
context.Services.AddSingleton(new MappingConfiguration()
|
||||
.Define(new CollectBusMapping()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -3,9 +3,7 @@ using System.Text;
|
||||
using Cassandra;
|
||||
using System.ComponentModel.DataAnnotations;
|
||||
using JiShe.CollectBus.Common.Attributes;
|
||||
using Cassandra.Mapping;
|
||||
using Cassandra.Data.Linq;
|
||||
using Thrift.Protocol.Entities;
|
||||
using Volo.Abp.Data;
|
||||
|
||||
namespace JiShe.CollectBus.Cassandra.Extensions
|
||||
{
|
||||
@ -16,17 +14,26 @@ namespace JiShe.CollectBus.Cassandra.Extensions
|
||||
var type = typeof(TEntity);
|
||||
var tableAttribute = type.GetCustomAttribute<CassandraTableAttribute>();
|
||||
var tableName = tableAttribute?.Name ?? type.Name.ToLower();
|
||||
//var tableKeyspace = tableAttribute?.Keyspace ?? defaultKeyspace;
|
||||
var tableKeyspace = session.Keyspace;
|
||||
|
||||
var properties = type.GetProperties();
|
||||
var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute<KeyAttribute>() != null);
|
||||
|
||||
// 分区键设置
|
||||
var primaryKey = properties.FirstOrDefault(p => p.GetCustomAttribute<PartitionKeyAttribute>() != null);
|
||||
|
||||
if (primaryKey == null)
|
||||
{
|
||||
throw new InvalidOperationException($"No primary key defined for type {type.Name}");
|
||||
}
|
||||
|
||||
// 集群键设置
|
||||
var clusteringKeys = properties.Where(p => p.GetCustomAttribute<ClusteringKeyAttribute>() != null).Select(a=>a.Name).ToList();
|
||||
var clusteringKeyCql = string.Empty;
|
||||
if (clusteringKeys.Any())
|
||||
{
|
||||
clusteringKeyCql = $", {string.Join(", ", clusteringKeys)}";
|
||||
}
|
||||
|
||||
var cql = new StringBuilder();
|
||||
cql.Append($"CREATE TABLE IF NOT EXISTS {tableKeyspace}.{tableName} (");
|
||||
|
||||
@ -40,7 +47,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions
|
||||
cql.Append($"{columnName} {cqlType}, ");
|
||||
}
|
||||
cql.Length -= 2; // Remove last comma and space
|
||||
cql.Append($", PRIMARY KEY ({primaryKey.Name.ToLower()}))");
|
||||
cql.Append($", PRIMARY KEY (({primaryKey.Name.ToLower()}){clusteringKeyCql}))");
|
||||
|
||||
session.Execute(cql.ToString());
|
||||
}
|
||||
@ -61,6 +68,7 @@ namespace JiShe.CollectBus.Cassandra.Extensions
|
||||
if (type == typeof(Guid)) return "uuid";
|
||||
if (type == typeof(DateTimeOffset)) return "timestamp";
|
||||
if (type == typeof(Byte[])) return "blob";
|
||||
if (type == typeof(ExtraPropertyDictionary)) return "map<text,text>";
|
||||
|
||||
// 处理集合类型
|
||||
if (type.IsGenericType)
|
||||
@ -72,6 +80,8 @@ namespace JiShe.CollectBus.Cassandra.Extensions
|
||||
return $"list<{GetCassandraType(elementType)}>";
|
||||
if (genericType == typeof(HashSet<>))
|
||||
return $"set<{GetCassandraType(elementType)}>";
|
||||
if (genericType == typeof(Nullable<>))
|
||||
return GetCassandraType(elementType);
|
||||
if (genericType == typeof(Dictionary<,>))
|
||||
{
|
||||
var keyType = type.GetGenericArguments()[0];
|
||||
|
||||
@ -10,7 +10,10 @@ namespace JiShe.CollectBus.Cassandra
|
||||
public interface ICassandraRepository<TEntity, TKey> where TEntity : class
|
||||
{
|
||||
Task<TEntity> GetAsync(TKey id);
|
||||
Task<List<TEntity>> GetListAsync();
|
||||
Task<TEntity?> GetAsync(string cql, params object[] args);
|
||||
Task<TEntity> FirstOrDefaultAsync(TKey id);
|
||||
Task<TEntity?> FirstOrDefaultAsync(string cql, params object[] args);
|
||||
Task<List<TEntity>?> GetListAsync(string? cql = null, params object[] args);
|
||||
Task<TEntity> InsertAsync(TEntity entity);
|
||||
Task<TEntity> UpdateAsync(TEntity entity);
|
||||
Task DeleteAsync(TEntity entity);
|
||||
|
||||
@ -15,8 +15,8 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\services\JiShe.CollectBus.Domain\JiShe.CollectBus.Domain.csproj" />
|
||||
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||
<ProjectReference Include="..\..\shared\JiShe.CollectBus.Domain.Shared\JiShe.CollectBus.Domain.Shared.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@ -4,14 +4,12 @@ using Volo.Abp.EventBus;
|
||||
namespace JiShe.CollectBus.Samples;
|
||||
|
||||
[EventName("Sample.Kafka.Test")]
|
||||
[TopicName("Test1")]
|
||||
public class SampleDto
|
||||
{
|
||||
public int Value { get; set; }
|
||||
}
|
||||
|
||||
[EventName("Sample.Kafka.Test2")]
|
||||
[TopicName("Test2")]
|
||||
public class SampleDto2
|
||||
{
|
||||
public int Value { get; set; }
|
||||
|
||||
@ -11,9 +11,11 @@ using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Reflection;
|
||||
using System.Threading.Tasks;
|
||||
using Cassandra.Mapping;
|
||||
using JiShe.CollectBus.Cassandra;
|
||||
using JiShe.CollectBus.FreeRedis;
|
||||
using JiShe.CollectBus.IoTDB;
|
||||
using JiShe.CollectBus.Mappers;
|
||||
using Volo.Abp;
|
||||
using Volo.Abp.Application;
|
||||
using Volo.Abp.Autofac;
|
||||
@ -23,6 +25,8 @@ using Volo.Abp.BackgroundWorkers.Hangfire;
|
||||
using Volo.Abp.EventBus;
|
||||
using Volo.Abp.Modularity;
|
||||
using Microsoft.Extensions.Options;
|
||||
using JiShe.CollectBus.Interceptors;
|
||||
using JiShe.CollectBus.Common.Attributes;
|
||||
|
||||
namespace JiShe.CollectBus;
|
||||
|
||||
@ -50,6 +54,23 @@ public class CollectBusApplicationModule : AbpModule
|
||||
{
|
||||
options.AddMaps<CollectBusApplicationModule>(validate: true);
|
||||
});
|
||||
|
||||
context.Services.AddSingleton(new MappingConfiguration()
|
||||
.Define(new CollectBusMapping()));
|
||||
|
||||
// 注册拦截器
|
||||
context.Services.OnRegistered(ctx =>
|
||||
{
|
||||
var methods = ctx.ImplementationType.GetMethods();
|
||||
foreach (var method in methods)
|
||||
{
|
||||
var attr = method.GetCustomAttribute(typeof(LogInterceptAttribute), true);
|
||||
if (attr != null)
|
||||
{
|
||||
ctx.Interceptors.TryAdd<LogInterceptor>();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public override async Task OnApplicationInitializationAsync(
|
||||
@ -69,10 +90,9 @@ public class CollectBusApplicationModule : AbpModule
|
||||
|
||||
//初始化主题信息
|
||||
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
|
||||
var configuration = context.ServiceProvider.GetRequiredService<IConfiguration>();
|
||||
var kafkaOptions = context.ServiceProvider.GetRequiredService<IOptions<KafkaOptionConfig>>();
|
||||
|
||||
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
|
||||
var topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
|
||||
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
|
||||
|
||||
foreach (var item in topics)
|
||||
|
||||
@ -0,0 +1,9 @@
|
||||
using System;
|
||||
|
||||
namespace JiShe.CollectBus.Interceptors
|
||||
{
|
||||
[AttributeUsage(AttributeTargets.Method)]
|
||||
public class LogInterceptAttribute : Attribute
|
||||
{
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,60 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Volo.Abp.DependencyInjection;
|
||||
using Volo.Abp.DynamicProxy;
|
||||
|
||||
namespace JiShe.CollectBus.Interceptors
|
||||
{
|
||||
public class LogInterceptor : AbpInterceptor, ITransientDependency
|
||||
{
|
||||
public override async Task InterceptAsync(IAbpMethodInvocation invocation)
|
||||
{
|
||||
// 方法执行前的逻辑(如果需要)
|
||||
|
||||
try
|
||||
{
|
||||
// 执行原始方法
|
||||
await invocation.ProceedAsync();
|
||||
|
||||
// 方法执行成功后,返回前的逻辑
|
||||
await OnSuccessAsync(invocation);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// 出现异常时的逻辑
|
||||
await OnExceptionAsync(invocation, ex);
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
// 方法结束前一定会执行的逻辑
|
||||
await OnCompleteAsync(invocation);
|
||||
}
|
||||
}
|
||||
|
||||
private Task OnSuccessAsync(IAbpMethodInvocation invocation)
|
||||
{
|
||||
// 方法执行成功后的逻辑
|
||||
// 可以访问 invocation.ReturnValue 获取返回值
|
||||
Console.WriteLine($"方法 {invocation.Method.Name} 成功执行,返回值:{invocation.ReturnValue}");
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private Task OnExceptionAsync(IAbpMethodInvocation invocation, Exception ex)
|
||||
{
|
||||
// 方法执行异常时的逻辑
|
||||
Console.WriteLine($"方法 {invocation.Method.Name} 执行异常:{ex.Message}");
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private Task OnCompleteAsync(IAbpMethodInvocation invocation)
|
||||
{
|
||||
// 无论成功失败,方法执行完毕后的逻辑
|
||||
Console.WriteLine($"方法 {invocation.Method.Name} 执行完毕");
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,13 +1,8 @@
|
||||
using Cassandra.Mapping;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using JiShe.CollectBus.IotSystems.Devices;
|
||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||
using static Cassandra.QueryTrace;
|
||||
|
||||
namespace JiShe.CollectBus.Cassandra.Mappers
|
||||
namespace JiShe.CollectBus.Mappers
|
||||
{
|
||||
public class CollectBusMapping: Mappings
|
||||
{
|
||||
@ -15,6 +10,8 @@ namespace JiShe.CollectBus.Cassandra.Mappers
|
||||
{
|
||||
For<MessageIssued>()
|
||||
.Column(e => e.Type, cm => cm.WithName("type").WithDbType<int>());
|
||||
For<Device>()
|
||||
.Column(e => e.Status, cm => cm.WithName("status").WithDbType<int>());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -25,6 +25,7 @@ using Volo.Abp.Domain.Repositories;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using Cassandra;
|
||||
using JiShe.CollectBus.Interceptors;
|
||||
|
||||
namespace JiShe.CollectBus.Samples;
|
||||
|
||||
@ -35,11 +36,10 @@ public class TestAppService : CollectBusAppService
|
||||
private readonly ICassandraRepository<MessageIssued, string> _messageReceivedCassandraRepository;
|
||||
private readonly ICassandraProvider _cassandraProvider;
|
||||
|
||||
|
||||
|
||||
public TestAppService(
|
||||
ILogger<TestAppService> logger,
|
||||
ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository, ICassandraProvider cassandraProvider)
|
||||
ICassandraRepository<MessageIssued, string> messageReceivedCassandraRepository,
|
||||
ICassandraProvider cassandraProvider)
|
||||
{
|
||||
_logger = logger;
|
||||
_messageReceivedCassandraRepository = messageReceivedCassandraRepository;
|
||||
@ -122,4 +122,11 @@ public class TestAppService : CollectBusAppService
|
||||
// 等待所有批处理完成
|
||||
await Task.WhenAll(tasks);
|
||||
}
|
||||
|
||||
[LogIntercept]
|
||||
public async Task<string> LogInterceptorTest(string str)
|
||||
{
|
||||
_logger.LogWarning(str);
|
||||
return str;
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,8 +29,6 @@ namespace JiShe.CollectBus.Subscribers
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly IRepository<MessageReceivedLogin, Guid> _messageReceivedLoginEventRepository;
|
||||
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
|
||||
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
|
||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
||||
private readonly IIoTDBProvider _dbProvider;
|
||||
|
||||
@ -42,15 +40,12 @@ namespace JiShe.CollectBus.Subscribers
|
||||
/// <param name="serviceProvider">The service provider.</param>
|
||||
/// <param name="messageReceivedLoginEventRepository">The message received login event repository.</param>
|
||||
/// <param name="messageReceivedHeartbeatEventRepository">The message received heartbeat event repository.</param>
|
||||
/// <param name="messageReceivedEventRepository">The message received event repository.</param>
|
||||
/// <param name="deviceRepository">The device repository.</param>
|
||||
/// <param name="meterReadingRecordsRepository">The device repository.</param>
|
||||
public SubscriberAppService(ILogger<SubscriberAppService> logger,
|
||||
ITcpService tcpService, IServiceProvider serviceProvider,
|
||||
ITcpService tcpService,
|
||||
IServiceProvider serviceProvider,
|
||||
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
|
||||
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
|
||||
IRepository<MessageReceived, Guid> messageReceivedEventRepository,
|
||||
IRepository<Device, Guid> deviceRepository,
|
||||
IIoTDBProvider dbProvider,
|
||||
IMeterReadingRecordRepository meterReadingRecordsRepository)
|
||||
{
|
||||
@ -59,8 +54,6 @@ namespace JiShe.CollectBus.Subscribers
|
||||
_serviceProvider = serviceProvider;
|
||||
_messageReceivedLoginEventRepository = messageReceivedLoginEventRepository;
|
||||
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
|
||||
_messageReceivedEventRepository = messageReceivedEventRepository;
|
||||
_deviceRepository = deviceRepository;
|
||||
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
||||
_dbProvider = dbProvider;
|
||||
}
|
||||
|
||||
@ -10,7 +10,6 @@ using Volo.Abp.Uow;
|
||||
|
||||
namespace JiShe.CollectBus.Workers
|
||||
{
|
||||
[IgnoreJob]
|
||||
public class EpiCollectWorker : HangfireBackgroundWorkerBase, ITransientDependency,ICollectWorker
|
||||
{
|
||||
private readonly ILogger<EpiCollectWorker> _logger;
|
||||
|
||||
@ -1,14 +1,18 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.ComponentModel.DataAnnotations;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using JiShe.CollectBus.Common.Attributes;
|
||||
using JiShe.CollectBus.Enums;
|
||||
using Volo.Abp.Auditing;
|
||||
using Volo.Abp.Domain.Entities;
|
||||
using Volo.Abp.Logging;
|
||||
|
||||
namespace JiShe.CollectBus.IotSystems.Devices
|
||||
{
|
||||
public class Device : AggregateRoot<Guid>
|
||||
public class Device : BasicAggregateRoot<Guid>
|
||||
{
|
||||
/// <summary>
|
||||
/// Device
|
||||
@ -20,6 +24,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
|
||||
/// <param name="status"></param>
|
||||
public Device(string number, string clientId, DateTime firstOnlineTime, DateTime lastOnlineTime, DeviceStatus status)
|
||||
{
|
||||
Id = Guid.NewGuid();
|
||||
Number = number;
|
||||
FirstOnlineTime = firstOnlineTime;
|
||||
LastOnlineTime = lastOnlineTime;
|
||||
@ -30,6 +35,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
|
||||
/// <summary>
|
||||
/// 集中器编号,在集中器登录时解析获取,并会更新为当前TCP连接的最新ClientId
|
||||
/// </summary>
|
||||
[PartitionKey]
|
||||
public string Number { get; set; }
|
||||
|
||||
/// <summary>
|
||||
@ -55,6 +61,7 @@ namespace JiShe.CollectBus.IotSystems.Devices
|
||||
/// <summary>
|
||||
/// 设备状态
|
||||
/// </summary>
|
||||
[PartitionKey]
|
||||
public DeviceStatus Status { get; set; }
|
||||
|
||||
/// <summary>
|
||||
|
||||
@ -11,19 +11,14 @@ using Volo.Abp.Domain.Entities;
|
||||
namespace JiShe.CollectBus.IotSystems.MessageIssueds
|
||||
{
|
||||
[CassandraTable]
|
||||
public class MessageIssued:IEntity<string>
|
||||
public class MessageIssued: ICassandraEntity<string>
|
||||
{
|
||||
public string ClientId { get; set; }
|
||||
public byte[] Message { get; set; }
|
||||
public string DeviceNo { get; set; }
|
||||
public IssuedEventType Type { get; set; }
|
||||
public string MessageId { get; set; }
|
||||
[Key]
|
||||
[PartitionKey]
|
||||
public string Id { get; set; }
|
||||
|
||||
public object?[] GetKeys()
|
||||
{
|
||||
return new object[] { Id };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,4 +21,20 @@ namespace JiShe.CollectBus.Common.Attributes
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)]
|
||||
public class PartitionKeyAttribute : Attribute
|
||||
{
|
||||
public PartitionKeyAttribute()
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field)]
|
||||
public class ClusteringKeyAttribute : Attribute
|
||||
{
|
||||
public ClusteringKeyAttribute()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,13 +0,0 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Common.Attributes
|
||||
{
|
||||
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
|
||||
public class IgnoreJobAttribute : Attribute
|
||||
{
|
||||
}
|
||||
}
|
||||
@ -4,7 +4,7 @@ using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.Common.AttributeInfo
|
||||
namespace JiShe.CollectBus.Common.Attributes
|
||||
{
|
||||
/// <summary>
|
||||
/// 排序序号
|
||||
@ -1,23 +0,0 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Volo.Abp.EventBus;
|
||||
using Volo.Abp;
|
||||
|
||||
namespace JiShe.CollectBus.Common.Attributes
|
||||
{
|
||||
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
|
||||
public class TopicNameAttribute : Attribute
|
||||
{
|
||||
public virtual string Name { get; }
|
||||
|
||||
public TopicNameAttribute(string name)
|
||||
{
|
||||
this.Name = Check.NotNullOrWhiteSpace(name, nameof(name));
|
||||
}
|
||||
|
||||
public string GetName(Type eventType) => this.Name;
|
||||
}
|
||||
}
|
||||
@ -8,7 +8,7 @@ using System.Runtime.InteropServices;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using JiShe.CollectBus.Common.AttributeInfo;
|
||||
using JiShe.CollectBus.Common.Attributes;
|
||||
|
||||
namespace JiShe.CollectBus.Common.Helpers
|
||||
{
|
||||
|
||||
14
shared/JiShe.CollectBus.Domain.Shared/CassandraBaseEntity.cs
Normal file
14
shared/JiShe.CollectBus.Domain.Shared/CassandraBaseEntity.cs
Normal file
@ -0,0 +1,14 @@
|
||||
using System;
|
||||
using System.ComponentModel.DataAnnotations;
|
||||
using JiShe.CollectBus.Common.Attributes;
|
||||
|
||||
namespace JiShe.CollectBus
|
||||
{
|
||||
public class CassandraBaseEntity<TKey>: ICassandraEntity<TKey>
|
||||
{
|
||||
/// <summary>
|
||||
/// Id
|
||||
/// </summary>
|
||||
public TKey Id { get; set; }
|
||||
}
|
||||
}
|
||||
19
shared/JiShe.CollectBus.Domain.Shared/ICassandraEntity.cs
Normal file
19
shared/JiShe.CollectBus.Domain.Shared/ICassandraEntity.cs
Normal file
@ -0,0 +1,19 @@
|
||||
using System;
|
||||
|
||||
namespace JiShe.CollectBus
|
||||
{
|
||||
public interface ICassandraEntity<TKey>
|
||||
{
|
||||
TKey Id { get; set; }
|
||||
}
|
||||
|
||||
public interface IHasCreationTime
|
||||
{
|
||||
DateTime CreationTime { get; set; }
|
||||
}
|
||||
|
||||
public interface IHasLastModificationTime
|
||||
{
|
||||
DateTime? LastModificationTime { get; set; }
|
||||
}
|
||||
}
|
||||
@ -31,4 +31,8 @@
|
||||
<None Remove="JiShe.CollectBus.Domain.Shared.abppkg" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\JiShe.CollectBus.Common\JiShe.CollectBus.Common.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@ -84,7 +84,7 @@
|
||||
"SaslPassword": "lixiao1980",
|
||||
"KafkaReplicationFactor": 3,
|
||||
"NumPartitions": 30,
|
||||
"ServerTagName": "JiSheCollectBus2"
|
||||
"ServerTagName": "JiSheCollectBus20"
|
||||
},
|
||||
"IoTDBOptions": {
|
||||
"UserName": "root",
|
||||
@ -95,7 +95,6 @@
|
||||
"OpenDebugMode": true,
|
||||
"UseTableSessionPoolByDefault": false
|
||||
},
|
||||
"ServerTagName": "JiSheCollectBus3",
|
||||
"Cassandra": {
|
||||
"ReplicationStrategy": {
|
||||
"Class": "NetworkTopologyStrategy", //策略为NetworkTopologyStrategy时才会有多个数据中心,SimpleStrategy用在只有一个数据中心的情况下
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user