Compare commits
No commits in common. "7696b7379be5510416b7fe2f065e5a82cc234c54" and "50639f6fcf204c42c60be43f254418b9bfc7250d" have entirely different histories.
7696b7379b
...
50639f6fcf
@ -1,17 +1,12 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using Apache.IoTDB.DataStructure;
|
||||
using Apache.IoTDB;
|
||||
using Confluent.Kafka;
|
||||
using JiShe.CollectBus.Ammeters;
|
||||
using JiShe.CollectBus.FreeSql;
|
||||
using JiShe.CollectBus.IoTDBProvider;
|
||||
using JiShe.CollectBus.IotSystems.PrepayModel;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.Extensions.Options;
|
||||
using JiShe.CollectBus.IoTDBProvider.Context;
|
||||
|
||||
namespace JiShe.CollectBus.Samples;
|
||||
|
||||
@ -19,20 +14,15 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
|
||||
{
|
||||
|
||||
private readonly IIoTDBProvider _iotDBProvider;
|
||||
private readonly IoTDBRuntimeContext _dbContext;
|
||||
private readonly IoTDBOptions _options;
|
||||
|
||||
public SampleAppService(IIoTDBProvider iotDBProvider, IOptions<IoTDBOptions> options,
|
||||
IoTDBRuntimeContext dbContext)
|
||||
public SampleAppService(IIoTDBProvider iotDBProvider)
|
||||
{
|
||||
_iotDBProvider = iotDBProvider;
|
||||
_options = options.Value;
|
||||
_dbContext = dbContext;
|
||||
}
|
||||
|
||||
|
||||
[HttpGet]
|
||||
public async Task UseSessionPool()
|
||||
public async Task AddReadingAsync(int buildTabletMode = 1)
|
||||
{
|
||||
ElectricityMeter meter = new ElectricityMeter()
|
||||
{
|
||||
@ -42,34 +32,11 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
|
||||
Current = 10,
|
||||
MeterModel = "DDZY-1980",
|
||||
ProjectCode = "10059",
|
||||
Voltage = 10,
|
||||
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
Voltage = 10
|
||||
};
|
||||
await _iotDBProvider.InsertAsync(meter);
|
||||
await _iotDBProvider.InsertAsync(meter, buildTabletMode);
|
||||
}
|
||||
|
||||
|
||||
[HttpGet]
|
||||
public async Task UseTableSessionPool()
|
||||
{
|
||||
//_dbContext.UseTableSessionPool = true;
|
||||
_iotDBProvider.SwitchSessionPool(true);
|
||||
|
||||
ElectricityMeter meter = new ElectricityMeter()
|
||||
{
|
||||
SystemName = "energy",
|
||||
DeviceId = "402440506",
|
||||
DeviceType = "Ammeter",
|
||||
Current = 10,
|
||||
MeterModel = "DDZY-1980",
|
||||
ProjectCode = "10059",
|
||||
Voltage = 10,
|
||||
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
};
|
||||
await _iotDBProvider.InsertAsync(meter);
|
||||
}
|
||||
|
||||
|
||||
public Task<SampleDto> GetAsync()
|
||||
{
|
||||
return Task.FromResult(
|
||||
|
||||
@ -40,7 +40,7 @@
|
||||
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
|
||||
},
|
||||
"Redis": {
|
||||
"Configuration": "118.190.144.92:6380,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
|
||||
"Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
|
||||
"DefaultDB": "14",
|
||||
"HangfireDB": "15"
|
||||
},
|
||||
@ -77,9 +77,8 @@
|
||||
"UserName": "root",
|
||||
"Password": "root",
|
||||
"ClusterList": [ "192.168.56.102:6667" ],
|
||||
"PoolSize": 2,
|
||||
"PoolSize": 10,
|
||||
"DataBaseName": "energy",
|
||||
"OpenDebugMode": true,
|
||||
"UseTableSessionPoolByDefault": false
|
||||
"OpenDebugMode": true
|
||||
}
|
||||
}
|
||||
@ -1,7 +1,4 @@
|
||||
using JiShe.CollectBus.IoTDBProvider.Context;
|
||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
||||
using JiShe.CollectBus.IoTDBProvider.Provider;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
@ -16,16 +13,7 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
public override void ConfigureServices(ServiceConfigurationContext context)
|
||||
{
|
||||
context.Services.Configure<IoTDBOptions>(context.Services.GetConfiguration().GetSection(nameof(IoTDBOptions)));
|
||||
|
||||
// 注册上下文为Scoped
|
||||
context.Services.AddScoped<IoTDBRuntimeContext>();
|
||||
|
||||
// 注册Session工厂
|
||||
context.Services.AddSingleton<IIoTDBSessionFactory, IoTDBSessionFactory>();
|
||||
|
||||
// 注册Provider
|
||||
context.Services.AddScoped<IIoTDBProvider, IoTDBProvider>();
|
||||
|
||||
context.Services.AddSingleton<IIoTDBProvider, IoTDBProvider>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,36 +0,0 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.IoTDBProvider.Context
|
||||
{
|
||||
/// <summary>
|
||||
/// IoTDB SessionPool 运行时上下文
|
||||
/// </summary>
|
||||
public class IoTDBRuntimeContext
|
||||
{
|
||||
private readonly bool _defaultValue;
|
||||
|
||||
public IoTDBRuntimeContext(IOptions<IoTDBOptions> options)
|
||||
{
|
||||
_defaultValue = options.Value.UseTableSessionPoolByDefault;
|
||||
UseTableSessionPool = _defaultValue;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 是否使用表模型存储, 默认false,使用tree模型存储
|
||||
/// </summary>
|
||||
public bool UseTableSessionPool { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 重置为默认值
|
||||
/// </summary>
|
||||
public void ResetToDefault()
|
||||
{
|
||||
UseTableSessionPool = _defaultValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -7,31 +7,27 @@ using System.Threading.Tasks;
|
||||
namespace JiShe.CollectBus.IoTDBProvider
|
||||
{
|
||||
/// <summary>
|
||||
/// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置
|
||||
/// IoTDB数据源
|
||||
/// </summary>
|
||||
public interface IIoTDBProvider
|
||||
{
|
||||
/// <summary>
|
||||
/// 切换 SessionPool
|
||||
/// </summary>
|
||||
/// <param name="useTableSession">是否使用表模型</param>
|
||||
void SwitchSessionPool(bool useTableSession);
|
||||
|
||||
/// <summary>
|
||||
/// 插入数据
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="entity"></param>
|
||||
/// <param name="buildTabletMode">构建表模型方式,1 根据实体《T》直接显示指定Tag,2根据实体《T》的名称指定表名</param>
|
||||
/// <returns></returns>
|
||||
Task InsertAsync<T>(T entity) where T : IoTEntity;
|
||||
Task InsertAsync<T>(T entity, int buildTabletMode) where T : IoTEntity;
|
||||
|
||||
/// <summary>
|
||||
/// 批量插入数据
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="entities"></param>
|
||||
/// <param name="buildTabletMode">构建表模型方式,1 根据实体《T》直接显示指定Tag,2根据实体《T》的名称指定表名</param>
|
||||
/// <returns></returns>
|
||||
Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity;
|
||||
Task BatchInsertAsync<T>(IEnumerable<T> entities, int buildTabletMode) where T : IoTEntity;
|
||||
|
||||
|
||||
/// <summary>
|
||||
|
||||
@ -1,16 +0,0 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.IoTDBProvider.Interface
|
||||
{
|
||||
/// <summary>
|
||||
/// Session 工厂接口
|
||||
/// </summary>
|
||||
public interface IIoTDBSessionFactory
|
||||
{
|
||||
IIoTDBSessionPool GetSessionPool(bool useTableSession);
|
||||
}
|
||||
}
|
||||
@ -1,35 +0,0 @@
|
||||
using Apache.IoTDB.DataStructure;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.IoTDBProvider.Interface
|
||||
{
|
||||
/// <summary>
|
||||
/// Session 连接池
|
||||
/// </summary>
|
||||
public interface IIoTDBSessionPool : IDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// 打开连接池
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task OpenAsync();
|
||||
|
||||
/// <summary>
|
||||
/// 插入数据
|
||||
/// </summary>
|
||||
/// <param name="tablet"></param>
|
||||
/// <returns></returns>
|
||||
Task<int> InsertAsync(Tablet tablet);
|
||||
|
||||
/// <summary>
|
||||
/// 查询数据
|
||||
/// </summary>
|
||||
/// <param name="sql"></param>
|
||||
/// <returns></returns>
|
||||
Task<SessionDataSet> ExecuteQueryStatementAsync(string sql);
|
||||
}
|
||||
}
|
||||
@ -12,7 +12,7 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
public class IoTDBOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// 数据库名称,表模型才有,树模型为空
|
||||
/// 数据库名称
|
||||
/// </summary>
|
||||
public string DataBaseName { get; set; }
|
||||
|
||||
@ -32,21 +32,11 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
/// <summary>
|
||||
/// 连接池大小
|
||||
/// </summary>
|
||||
public int PoolSize { get; set; } = 2;
|
||||
|
||||
/// <summary>
|
||||
/// 查询时,每次查询的数据量,默认1024
|
||||
/// </summary>
|
||||
public int FetchSize { get; set; } = 1024;
|
||||
public int PoolSize { get; set; } = 3;
|
||||
|
||||
/// <summary>
|
||||
/// 是否开启调试模式,生产环境请关闭,因为底层的实现方式,可能会导致内存持续增长。
|
||||
/// </summary>
|
||||
public bool OpenDebugMode { get; set;}
|
||||
|
||||
/// <summary>
|
||||
/// 是否使用表模型存储, 默认false,使用tree模型存储
|
||||
/// </summary>
|
||||
public bool UseTableSessionPoolByDefault { get; set; } = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,20 +12,18 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
public class QueryOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// 表模型的表名称或者树模型的设备路径
|
||||
/// 表名或标签名
|
||||
/// </summary>
|
||||
public required string TableNameOrTreePath { get; set; }
|
||||
public required string TableNameOrTagName { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 分页
|
||||
/// </summary>
|
||||
public int Page { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 分页大小
|
||||
/// </summary>
|
||||
public int PageSize { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 查询条件
|
||||
/// </summary>
|
||||
|
||||
@ -17,7 +17,7 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="entity"></param>
|
||||
/// <returns></returns>
|
||||
public static string GetDevicePath<T>(T entity) where T : IoTEntity
|
||||
public static string GetDeviceId<T>(T entity) where T : IoTEntity
|
||||
{
|
||||
return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceId}`";
|
||||
}
|
||||
|
||||
@ -2,10 +2,6 @@
|
||||
using Apache.IoTDB.DataStructure;
|
||||
using JiShe.CollectBus.Common.Extensions;
|
||||
using JiShe.CollectBus.Common.Helpers;
|
||||
using JiShe.CollectBus.IoTDBProvider.Context;
|
||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
||||
using JiShe.CollectBus.IoTDBProvider.Provider;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using System;
|
||||
@ -21,54 +17,34 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
/// <summary>
|
||||
/// IoTDB数据源
|
||||
/// </summary>
|
||||
public class IoTDBProvider : IIoTDBProvider
|
||||
public class IoTDBProvider : IIoTDBProvider, IDisposable
|
||||
{
|
||||
private IIoTDBSessionPool _currentSession;
|
||||
private readonly IoTDBOptions _options;
|
||||
private readonly TableSessionPool _sessionPool;
|
||||
private static readonly ConcurrentDictionary<Type, DeviceMetadata> _metadataCache = new();
|
||||
private readonly ILogger<IoTDBProvider> _logger;
|
||||
private readonly IIoTDBSessionFactory _sessionFactory;
|
||||
private readonly IoTDBRuntimeContext _runtimeContext;
|
||||
|
||||
public IoTDBProvider(
|
||||
ILogger<IoTDBProvider> logger,
|
||||
IIoTDBSessionFactory sessionFactory,
|
||||
IoTDBRuntimeContext runtimeContext)
|
||||
public IoTDBProvider(IOptions<IoTDBOptions> options, ILogger<IoTDBProvider> logger)
|
||||
{
|
||||
_options = options.Value;
|
||||
|
||||
_sessionPool = new TableSessionPool.Builder()
|
||||
.SetNodeUrls(_options.ClusterList)
|
||||
.SetUsername(_options.UserName)
|
||||
.SetPassword(_options.Password)
|
||||
.SetDatabase(_options.DataBaseName)
|
||||
.SetFetchSize(_options.PoolSize)
|
||||
.Build();
|
||||
|
||||
_sessionPool.Open(false).Wait();
|
||||
if (_options.OpenDebugMode)
|
||||
{
|
||||
_sessionPool.OpenDebugMode(builder =>
|
||||
{
|
||||
builder.AddConsole();
|
||||
});
|
||||
}
|
||||
_logger = logger;
|
||||
_sessionFactory = sessionFactory;
|
||||
_runtimeContext = runtimeContext;
|
||||
|
||||
RefreshSessionPool();
|
||||
}
|
||||
|
||||
private void RefreshSessionPool()
|
||||
{
|
||||
try
|
||||
{
|
||||
_currentSession?.Dispose();
|
||||
_currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
|
||||
_currentSession.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
//自动回退到备用SessionPool
|
||||
_logger.LogError($"{nameof(RefreshSessionPool)} 切换SessionPool失败,回退到默认配置:{ex.Serialize()}");
|
||||
_runtimeContext.UseTableSessionPool = !_runtimeContext.UseTableSessionPool;
|
||||
_currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 切换 SessionPool
|
||||
/// </summary>
|
||||
/// <param name="useTableSession"></param>
|
||||
public void SwitchSessionPool(bool useTableSession)
|
||||
{
|
||||
if (_runtimeContext.UseTableSessionPool != useTableSession)
|
||||
{
|
||||
_runtimeContext.UseTableSessionPool = useTableSession;
|
||||
RefreshSessionPool();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -77,17 +53,15 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="entity"></param>
|
||||
/// <returns></returns>
|
||||
public async Task InsertAsync<T>(T entity) where T : IoTEntity
|
||||
public async Task InsertAsync<T>(T entity, int buildTabletMode) where T : IoTEntity
|
||||
{
|
||||
var metadata = GetMetadata<T>();
|
||||
|
||||
var tablet = BuildTablet(new[] { entity }, metadata);
|
||||
|
||||
int result = await _currentSession.InsertAsync(tablet);
|
||||
|
||||
if (result <= 0)
|
||||
var tablet = BuildTablet(new[] { entity }, metadata, buildTabletMode);
|
||||
var result = await _sessionPool.InsertAsync(tablet);
|
||||
if (result <=0)
|
||||
{
|
||||
_logger.LogError($"{typeof(T).Name}插入数据没有成功");
|
||||
_logger.LogWarning($"{typeof(T).Name}插入数据没有成功");
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,8 +69,9 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
/// 批量插入数据
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="entities"></param>
|
||||
/// <returns></returns>
|
||||
public async Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity
|
||||
public async Task BatchInsertAsync<T>(IEnumerable<T> entities, int buildTabletMode) where T : IoTEntity
|
||||
{
|
||||
var metadata = GetMetadata<T>();
|
||||
|
||||
@ -105,8 +80,8 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
|
||||
foreach (var batch in batches)
|
||||
{
|
||||
var tablet = BuildTablet(batch, metadata);
|
||||
var result = await _currentSession.InsertAsync(tablet);
|
||||
var tablet = BuildTablet(batch, metadata, buildTabletMode);
|
||||
var result = await _sessionPool.InsertAsync(tablet);
|
||||
if (result <= 0)
|
||||
{
|
||||
_logger.LogWarning($"{typeof(T).Name} 批量插入数据第{batch}批次没有成功,共{batches}批次。");
|
||||
@ -124,7 +99,7 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
public async Task<object> DeleteAsync<T>(QueryOptions options) where T : IoTEntity
|
||||
{
|
||||
var query = BuildDeleteSQL<T>(options);
|
||||
var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query);
|
||||
var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query);
|
||||
|
||||
if (!sessionDataSet.HasNext())
|
||||
{
|
||||
@ -137,6 +112,9 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
return row.Values[0];
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 查询数据
|
||||
/// </summary>
|
||||
@ -146,7 +124,7 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
public async Task<PagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
|
||||
{
|
||||
var query = BuildQuerySQL<T>(options);
|
||||
var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query);
|
||||
var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query);
|
||||
|
||||
var result = new PagedResult<T>
|
||||
{
|
||||
@ -158,17 +136,17 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 构建Tablet
|
||||
/// 构建表模型
|
||||
/// </summary>
|
||||
/// <typeparam name="T"></typeparam>
|
||||
/// <param name="entities">表实体</param>
|
||||
/// <param name="metadata">设备元数据</param></param>
|
||||
/// <param name="metadata">设备元数据</param>
|
||||
/// <param name="buildTabletMode">构建表模型方式,1 根据实体《T》直接显示指定Tag,2根据实体《T》的名称指定表名</param>
|
||||
/// <returns></returns>
|
||||
private Tablet BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity
|
||||
private Tablet BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata, int buildTabletMode) where T : IoTEntity
|
||||
{
|
||||
var timestamps = new List<long>();
|
||||
var values = new List<List<object>>();
|
||||
var devicePaths = new HashSet<string>();
|
||||
|
||||
foreach (var entity in entities)
|
||||
{
|
||||
@ -177,70 +155,41 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
foreach (var measurement in metadata.ColumnNames)
|
||||
{
|
||||
var value = typeof(T).GetProperty(measurement)?.GetValue(entity);
|
||||
rowValues.Add(value ?? new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空,不符合IoTDB设计标准,请赋值以后重新处理。"));
|
||||
if (value == null)
|
||||
{
|
||||
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空,不符合IoTDB设计标准,请赋值以后重新处理。");
|
||||
}
|
||||
rowValues.Add(value);
|
||||
}
|
||||
values.Add(rowValues);
|
||||
if (!_runtimeContext.UseTableSessionPool)//树模型
|
||||
{
|
||||
devicePaths.Add(DevicePathBuilder.GetDevicePath(entity));
|
||||
}
|
||||
else
|
||||
{
|
||||
devicePaths.Add(DevicePathBuilder.GetTableName<T>());
|
||||
}
|
||||
}
|
||||
|
||||
if (devicePaths.Count > 1)
|
||||
if (buildTabletMode == 1)
|
||||
{
|
||||
throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。");
|
||||
return new Tablet(
|
||||
DevicePathBuilder.GetDeviceId(entities.First()),
|
||||
metadata.ColumnNames,
|
||||
metadata.ColumnCategories,
|
||||
metadata.DataTypes,
|
||||
values,
|
||||
timestamps
|
||||
);
|
||||
}
|
||||
else if (buildTabletMode == 2)
|
||||
{
|
||||
return new Tablet(
|
||||
DevicePathBuilder.GetTableName<T>(),
|
||||
metadata.ColumnNames,
|
||||
metadata.ColumnCategories,
|
||||
metadata.DataTypes,
|
||||
values,
|
||||
timestamps
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,buildTabletMode参数值不正确,请赋值以后重新处理。");
|
||||
}
|
||||
|
||||
return _runtimeContext.UseTableSessionPool
|
||||
? BuildTableSessionTablet(metadata, devicePaths.First(), values, timestamps)
|
||||
: BuildSessionTablet(metadata, devicePaths.First(), values, timestamps);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 构建tree模型的Tablet
|
||||
/// </summary>
|
||||
/// <param name="metadata"></param>
|
||||
/// <param name="devicePath"></param>
|
||||
/// <param name="values"></param>
|
||||
/// <param name="timestamps"></param>
|
||||
/// <returns></returns>
|
||||
private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath,
|
||||
List<List<object>> values, List<long> timestamps)
|
||||
{
|
||||
return new Tablet(
|
||||
devicePath,
|
||||
metadata.ColumnNames,
|
||||
metadata.DataTypes,
|
||||
values,
|
||||
timestamps
|
||||
);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 构建表模型的Tablet
|
||||
/// </summary>
|
||||
/// <param name="metadata"></param>
|
||||
/// <param name="devicePath"></param>
|
||||
/// <param name="values"></param>
|
||||
/// <param name="timestamps"></param>
|
||||
/// <returns></returns>
|
||||
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string devicePath,
|
||||
List<List<object>> values, List<long> timestamps)
|
||||
{
|
||||
var tablet = new Tablet(
|
||||
devicePath,
|
||||
metadata.ColumnNames,
|
||||
metadata.ColumnCategories,
|
||||
metadata.DataTypes,
|
||||
values,
|
||||
timestamps
|
||||
);
|
||||
|
||||
return tablet;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -254,7 +203,7 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
var metadata = GetMetadata<T>();
|
||||
var sb = new StringBuilder("SELECT ");
|
||||
sb.AppendJoin(", ", metadata.ColumnNames);
|
||||
sb.Append($" FROM {options.TableNameOrTreePath}");
|
||||
sb.Append($" FROM {options.TableNameOrTagName}");
|
||||
|
||||
if (options.Conditions.Any())
|
||||
{
|
||||
@ -275,18 +224,9 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
private string BuildDeleteSQL<T>(QueryOptions options) where T : IoTEntity
|
||||
{
|
||||
var metadata = GetMetadata<T>();
|
||||
var sb = new StringBuilder();
|
||||
var sb = new StringBuilder("DELETE ");
|
||||
|
||||
if (!_runtimeContext.UseTableSessionPool)
|
||||
{
|
||||
sb.Append("DELETE ");
|
||||
}
|
||||
else
|
||||
{
|
||||
sb.Append("DROP ");
|
||||
}
|
||||
|
||||
sb.Append($" FROM {options.TableNameOrTreePath}");
|
||||
sb.Append($" FROM {options.TableNameOrTagName}");
|
||||
|
||||
sb.AppendJoin(", ", metadata.ColumnNames);
|
||||
|
||||
@ -324,13 +264,13 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
/// <returns></returns>
|
||||
private async Task<int> GetTotalCount<T>(QueryOptions options) where T : IoTEntity
|
||||
{
|
||||
var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}";
|
||||
var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTagName}";
|
||||
if (options.Conditions.Any())
|
||||
{
|
||||
countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition));
|
||||
}
|
||||
|
||||
var result = await _currentSession.ExecuteQueryStatementAsync(countQuery);
|
||||
var result = await _sessionPool.ExecuteQueryStatementAsync(countQuery);
|
||||
return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0;
|
||||
}
|
||||
|
||||
@ -375,6 +315,14 @@ namespace JiShe.CollectBus.IoTDBProvider
|
||||
return results;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 释放资源
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
_sessionPool?.Close().Wait();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 获取设备元数据
|
||||
/// </summary>
|
||||
|
||||
@ -1,36 +0,0 @@
|
||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
||||
using Microsoft.Extensions.Options;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace JiShe.CollectBus.IoTDBProvider.Provider
|
||||
{
|
||||
|
||||
/// <summary>
|
||||
/// 实现带缓存的Session工厂
|
||||
/// </summary>
|
||||
public class IoTDBSessionFactory : IIoTDBSessionFactory
|
||||
{
|
||||
private readonly IoTDBOptions _options;
|
||||
private readonly ConcurrentDictionary<bool, IIoTDBSessionPool> _pools = new();
|
||||
|
||||
public IoTDBSessionFactory(IOptions<IoTDBOptions> options)
|
||||
{
|
||||
_options = options.Value;
|
||||
}
|
||||
|
||||
public IIoTDBSessionPool GetSessionPool(bool useTableSession)
|
||||
{
|
||||
return _pools.GetOrAdd(useTableSession, key =>
|
||||
{
|
||||
return key
|
||||
? new TableSessionPoolAdapter(_options)
|
||||
: new SessionPoolAdapter(_options);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -30,8 +30,8 @@
|
||||
public string DeviceId { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// 当前时间戳,单位毫秒
|
||||
/// 当前时间戳,单位秒
|
||||
/// </summary>
|
||||
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
||||
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,74 +0,0 @@
|
||||
using Apache.IoTDB.DataStructure;
|
||||
using Apache.IoTDB;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace JiShe.CollectBus.IoTDBProvider.Provider
|
||||
{
|
||||
/// <summary>
|
||||
/// 树模型连接池
|
||||
/// </summary>
|
||||
public class SessionPoolAdapter : IIoTDBSessionPool
|
||||
{
|
||||
private readonly SessionPool _sessionPool;
|
||||
private readonly IoTDBOptions _options;
|
||||
|
||||
public SessionPoolAdapter(IoTDBOptions options)
|
||||
{
|
||||
_options = options;
|
||||
_sessionPool = new SessionPool.Builder()
|
||||
.SetNodeUrl(options.ClusterList)
|
||||
.SetUsername(options.UserName)
|
||||
.SetPassword(options.Password)
|
||||
.SetFetchSize(options.FetchSize)
|
||||
.SetPoolSize(options.PoolSize)
|
||||
.Build();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 打开连接池
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task OpenAsync()
|
||||
{
|
||||
await _sessionPool.Open(false);
|
||||
if (_options.OpenDebugMode)
|
||||
{
|
||||
_sessionPool.OpenDebugMode(builder =>
|
||||
{
|
||||
builder.AddConsole();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 批量插入对齐时间序列数据
|
||||
/// </summary>
|
||||
/// <param name="tablet"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<int> InsertAsync(Tablet tablet)
|
||||
{
|
||||
return await _sessionPool.InsertAlignedTabletAsync(tablet);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 查询数据
|
||||
/// </summary>
|
||||
/// <param name="sql"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
|
||||
{
|
||||
return await _sessionPool.ExecuteQueryStatementAsync(sql);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_sessionPool?.Close().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,72 +0,0 @@
|
||||
using Apache.IoTDB.DataStructure;
|
||||
using Apache.IoTDB;
|
||||
using JiShe.CollectBus.IoTDBProvider.Interface;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace JiShe.CollectBus.IoTDBProvider.Provider
|
||||
{
|
||||
/// <summary>
|
||||
/// 表模型Session连接池
|
||||
/// </summary>
|
||||
public class TableSessionPoolAdapter : IIoTDBSessionPool
|
||||
{
|
||||
private readonly TableSessionPool _sessionPool;
|
||||
private readonly IoTDBOptions _options;
|
||||
|
||||
public TableSessionPoolAdapter(IoTDBOptions options)
|
||||
{
|
||||
_options = options;
|
||||
_sessionPool = new TableSessionPool.Builder()
|
||||
.SetNodeUrls(options.ClusterList)
|
||||
.SetUsername(options.UserName)
|
||||
.SetPassword(options.Password)
|
||||
.SetFetchSize(options.FetchSize)
|
||||
.SetPoolSize(options.PoolSize)
|
||||
.SetDatabase(options.DataBaseName)
|
||||
.Build();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 打开连接池
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public async Task OpenAsync()
|
||||
{
|
||||
await _sessionPool.Open(false);
|
||||
if (_options.OpenDebugMode)
|
||||
{
|
||||
_sessionPool.OpenDebugMode(builder => builder.AddConsole());
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 批量插入
|
||||
/// </summary>
|
||||
/// <param name="tablet"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<int> InsertAsync(Tablet tablet)
|
||||
{
|
||||
return await _sessionPool.InsertAsync(tablet);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 查询数据
|
||||
/// </summary>
|
||||
/// <param name="sql"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
|
||||
{
|
||||
return await _sessionPool.ExecuteQueryStatementAsync(sql);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_sessionPool?.Close().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user