dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
15 changed files with 464 additions and 198 deletions
Showing only changes of commit 7696b7379b - Show all commits

View File

@ -11,6 +11,7 @@ using JiShe.CollectBus.IotSystems.PrepayModel;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using JiShe.CollectBus.IoTDBProvider.Context;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;
@ -18,17 +19,20 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
{ {
private readonly IIoTDBProvider _iotDBProvider; private readonly IIoTDBProvider _iotDBProvider;
private readonly IoTDBRuntimeContext _dbContext;
private readonly IoTDBOptions _options; private readonly IoTDBOptions _options;
public SampleAppService(IIoTDBProvider iotDBProvider, IOptions<IoTDBOptions> options) public SampleAppService(IIoTDBProvider iotDBProvider, IOptions<IoTDBOptions> options,
IoTDBRuntimeContext dbContext)
{ {
_iotDBProvider = iotDBProvider; _iotDBProvider = iotDBProvider;
_options = options.Value; _options = options.Value;
_dbContext = dbContext;
} }
[HttpGet] [HttpGet]
public async Task AddReadingAsync(int buildTabletMode = 1) public async Task UseSessionPool()
{ {
ElectricityMeter meter = new ElectricityMeter() ElectricityMeter meter = new ElectricityMeter()
{ {
@ -38,95 +42,31 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
Current = 10, Current = 10,
MeterModel = "DDZY-1980", MeterModel = "DDZY-1980",
ProjectCode = "10059", ProjectCode = "10059",
Voltage = 10 Voltage = 10,
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
}; };
await _iotDBProvider.InsertAsync(meter, buildTabletMode); await _iotDBProvider.InsertAsync(meter);
} }
[HttpGet] [HttpGet]
public async Task AddReading2Async() public async Task UseTableSessionPool()
{ {
var tableSessionPool = new TableSessionPool.Builder() //_dbContext.UseTableSessionPool = true;
.SetNodeUrls(_options.ClusterList) _iotDBProvider.SwitchSessionPool(true);
.SetUsername(_options.UserName)
.SetPassword(_options.Password)
.SetFetchSize(1024)
.Build();
await tableSessionPool.Open(false); ElectricityMeter meter = new ElectricityMeter()
await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test1");
await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test2");
//await tableSessionPool.ExecuteNonQueryStatementAsync("use test2");
//// or use full qualified table name
//await tableSessionPool.ExecuteNonQueryStatementAsync(
// "create table test1.table1(region_id STRING TAG, plant_id STRING TAG, device_id STRING TAG, model STRING ATTRIBUTE, temperature FLOAT FIELD, humidity DOUBLE FIELD) with (TTL=3600000)");
//await tableSessionPool.ExecuteNonQueryStatementAsync(
// "create table table2(region_id STRING TAG, plant_id STRING TAG, color STRING ATTRIBUTE, temperature FLOAT FIELD, speed DOUBLE FIELD) with (TTL=6600000)");
//// show tables from current database
//var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES");
//res.ShowTableNames();
//while (res.HasNext()) Console.WriteLine(res.Next());
//await res.Close();
//// show tables by specifying another database
//// using SHOW tables FROM
//res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES FROM test1");
//res.ShowTableNames();
//while (res.HasNext()) Console.WriteLine(res.Next());
//await res.Close();
var tableName = "testTable1";
List<string> columnNames =
new List<string> {
"region_id",
"plant_id",
"device_id",
"model",
"temperature",
"humidity" };
List<TSDataType> dataTypes =
new List<TSDataType>{
TSDataType.STRING,
TSDataType.STRING,
TSDataType.STRING,
TSDataType.STRING,
TSDataType.FLOAT,
TSDataType.DOUBLE};
List<ColumnCategory> columnCategories =
new List<ColumnCategory>{
ColumnCategory.TAG,
ColumnCategory.TAG,
ColumnCategory.TAG,
ColumnCategory.ATTRIBUTE,
ColumnCategory.FIELD,
ColumnCategory.FIELD};
var values = new List<List<object>> { };
var timestamps = new List<long> { };
for (long timestamp = 0; timestamp < 100; timestamp++)
{ {
timestamps.Add(timestamp); SystemName = "energy",
values.Add(new List<object> { "1", "5", "3", "A", 1.23F + timestamp, 111.1 + timestamp DeviceId = "402440506",
}); DeviceType = "Ammeter",
} Current = 10,
var tablet = new Tablet(tableName, columnNames, columnCategories, dataTypes, values, timestamps); MeterModel = "DDZY-1980",
await tableSessionPool.InsertAsync(tablet); ProjectCode = "10059",
Voltage = 10,
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
//var res = await tableSessionPool.ExecuteQueryStatementAsync("select * from testTable1 " };
// + "where region_id = '1' and plant_id in ('3', '5') and device_id = '3'"); await _iotDBProvider.InsertAsync(meter);
// res.ShowTableNames();
// while (res.HasNext()) Console.WriteLine(res.Next());
// await res.Close();
await tableSessionPool.Close();
} }

View File

@ -40,7 +40,7 @@
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
}, },
"Redis": { "Redis": {
"Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "Configuration": "118.190.144.92:6380,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"DefaultDB": "14", "DefaultDB": "14",
"HangfireDB": "15" "HangfireDB": "15"
}, },
@ -78,7 +78,8 @@
"Password": "root", "Password": "root",
"ClusterList": [ "192.168.56.102:6667" ], "ClusterList": [ "192.168.56.102:6667" ],
"PoolSize": 2, "PoolSize": 2,
"DataBaseName": "root.energy2", "DataBaseName": "energy",
"OpenDebugMode": true "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
} }
} }

View File

@ -1,4 +1,7 @@
using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.IoTDBProvider.Context;
using JiShe.CollectBus.IoTDBProvider.Interface;
using JiShe.CollectBus.IoTDBProvider.Provider;
using Microsoft.Extensions.DependencyInjection;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -13,7 +16,16 @@ namespace JiShe.CollectBus.IoTDBProvider
public override void ConfigureServices(ServiceConfigurationContext context) public override void ConfigureServices(ServiceConfigurationContext context)
{ {
context.Services.Configure<IoTDBOptions>(context.Services.GetConfiguration().GetSection(nameof(IoTDBOptions))); context.Services.Configure<IoTDBOptions>(context.Services.GetConfiguration().GetSection(nameof(IoTDBOptions)));
context.Services.AddSingleton<IIoTDBProvider, IoTDBProvider>();
// 注册上下文为Scoped
context.Services.AddScoped<IoTDBRuntimeContext>();
// 注册Session工厂
context.Services.AddSingleton<IIoTDBSessionFactory, IoTDBSessionFactory>();
// 注册Provider
context.Services.AddScoped<IIoTDBProvider, IoTDBProvider>();
} }
} }
} }

View File

@ -0,0 +1,36 @@
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Context
{
/// <summary>
/// IoTDB SessionPool 运行时上下文
/// </summary>
public class IoTDBRuntimeContext
{
private readonly bool _defaultValue;
public IoTDBRuntimeContext(IOptions<IoTDBOptions> options)
{
_defaultValue = options.Value.UseTableSessionPoolByDefault;
UseTableSessionPool = _defaultValue;
}
/// <summary>
/// 是否使用表模型存储, 默认false使用tree模型存储
/// </summary>
public bool UseTableSessionPool { get; set; }
/// <summary>
/// 重置为默认值
/// </summary>
public void ResetToDefault()
{
UseTableSessionPool = _defaultValue;
}
}
}

View File

@ -7,27 +7,31 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider namespace JiShe.CollectBus.IoTDBProvider
{ {
/// <summary> /// <summary>
/// IoTDB数据源 /// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置
/// </summary> /// </summary>
public interface IIoTDBProvider public interface IIoTDBProvider
{ {
/// <summary>
/// 切换 SessionPool
/// </summary>
/// <param name="useTableSession">是否使用表模型</param>
void SwitchSessionPool(bool useTableSession);
/// <summary> /// <summary>
/// 插入数据 /// 插入数据
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="entity"></param> /// <param name="entity"></param>
/// <param name="buildTabletMode">构建表模型方式1 根据实体《T》直接显示指定Tag2根据实体《T》的名称指定表名</param>
/// <returns></returns> /// <returns></returns>
Task InsertAsync<T>(T entity, int buildTabletMode) where T : IoTEntity; Task InsertAsync<T>(T entity) where T : IoTEntity;
/// <summary> /// <summary>
/// 批量插入数据 /// 批量插入数据
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="entities"></param> /// <param name="entities"></param>
/// <param name="buildTabletMode">构建表模型方式1 根据实体《T》直接显示指定Tag2根据实体《T》的名称指定表名</param>
/// <returns></returns> /// <returns></returns>
Task BatchInsertAsync<T>(IEnumerable<T> entities, int buildTabletMode) where T : IoTEntity; Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity;
/// <summary> /// <summary>

View File

@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Interface
{
/// <summary>
/// Session 工厂接口
/// </summary>
public interface IIoTDBSessionFactory
{
IIoTDBSessionPool GetSessionPool(bool useTableSession);
}
}

View File

@ -0,0 +1,35 @@
using Apache.IoTDB.DataStructure;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Interface
{
/// <summary>
/// Session 连接池
/// </summary>
public interface IIoTDBSessionPool : IDisposable
{
/// <summary>
/// 打开连接池
/// </summary>
/// <returns></returns>
Task OpenAsync();
/// <summary>
/// 插入数据
/// </summary>
/// <param name="tablet"></param>
/// <returns></returns>
Task<int> InsertAsync(Tablet tablet);
/// <summary>
/// 查询数据
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
Task<SessionDataSet> ExecuteQueryStatementAsync(string sql);
}
}

View File

@ -12,7 +12,7 @@ namespace JiShe.CollectBus.IoTDBProvider
public class IoTDBOptions public class IoTDBOptions
{ {
/// <summary> /// <summary>
/// 数据库名称 /// 数据库名称,表模型才有,树模型为空
/// </summary> /// </summary>
public string DataBaseName { get; set; } public string DataBaseName { get; set; }
@ -43,5 +43,10 @@ namespace JiShe.CollectBus.IoTDBProvider
/// 是否开启调试模式,生产环境请关闭,因为底层的实现方式,可能会导致内存持续增长。 /// 是否开启调试模式,生产环境请关闭,因为底层的实现方式,可能会导致内存持续增长。
/// </summary> /// </summary>
public bool OpenDebugMode { get; set;} public bool OpenDebugMode { get; set;}
/// <summary>
/// 是否使用表模型存储, 默认false使用tree模型存储
/// </summary>
public bool UseTableSessionPoolByDefault { get; set; } = false;
} }
} }

View File

@ -12,18 +12,20 @@ namespace JiShe.CollectBus.IoTDBProvider
public class QueryOptions public class QueryOptions
{ {
/// <summary> /// <summary>
/// 表名或标签名 /// 表模型的表名称或者树模型的设备路径
/// </summary> /// </summary>
public required string TableNameOrTagName { get; set; } public required string TableNameOrTreePath { get; set; }
/// <summary> /// <summary>
/// 分页 /// 分页
/// </summary> /// </summary>
public int Page { get; set; } public int Page { get; set; }
/// <summary> /// <summary>
/// 分页大小 /// 分页大小
/// </summary> /// </summary>
public int PageSize { get; set; } public int PageSize { get; set; }
/// <summary> /// <summary>
/// 查询条件 /// 查询条件
/// </summary> /// </summary>

View File

@ -17,7 +17,7 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="entity"></param> /// <param name="entity"></param>
/// <returns></returns> /// <returns></returns>
public static string GetDeviceId<T>(T entity) where T : IoTEntity public static string GetDevicePath<T>(T entity) where T : IoTEntity
{ {
return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceId}`"; return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceId}`";
} }

View File

@ -2,6 +2,9 @@
using Apache.IoTDB.DataStructure; using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IoTDBProvider.Context;
using JiShe.CollectBus.IoTDBProvider.Interface;
using JiShe.CollectBus.IoTDBProvider.Provider;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
@ -18,40 +21,54 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <summary> /// <summary>
/// IoTDB数据源 /// IoTDB数据源
/// </summary> /// </summary>
public class IoTDBProvider : IIoTDBProvider, IDisposable public class IoTDBProvider : IIoTDBProvider
{ {
private readonly IoTDBOptions _options; private IIoTDBSessionPool _currentSession;
private readonly TableSessionPool _sessionPool;
private static readonly ConcurrentDictionary<Type, DeviceMetadata> _metadataCache = new(); private static readonly ConcurrentDictionary<Type, DeviceMetadata> _metadataCache = new();
private readonly ILogger<IoTDBProvider> _logger; private readonly ILogger<IoTDBProvider> _logger;
private readonly IIoTDBSessionFactory _sessionFactory;
private readonly IoTDBRuntimeContext _runtimeContext;
public IoTDBProvider(IOptions<IoTDBOptions> options, ILogger<IoTDBProvider> logger) public IoTDBProvider(
ILogger<IoTDBProvider> logger,
IIoTDBSessionFactory sessionFactory,
IoTDBRuntimeContext runtimeContext)
{ {
_options = options.Value;
_sessionPool = new TableSessionPool.Builder()
#if DEBUG
.SetHost(_options.ClusterList.FirstOrDefault()?.Split(":")[0])
.SetHost(_options.ClusterList.FirstOrDefault()?.Split(":")[1])
#else
.SetNodeUrls(_options.ClusterList)
#endif
.SetUsername(_options.UserName)
.SetPassword(_options.Password)
.SetFetchSize(_options.FetchSize)
.SetPoolSize(_options.PoolSize)
.SetDatabase(_options.DataBaseName)
.Build();
_sessionPool.Open(false).Wait();
if (_options.OpenDebugMode)
{
_sessionPool.OpenDebugMode(builder =>
{
builder.AddConsole();
});
}
_logger = logger; _logger = logger;
_sessionFactory = sessionFactory;
_runtimeContext = runtimeContext;
RefreshSessionPool();
}
private void RefreshSessionPool()
{
try
{
_currentSession?.Dispose();
_currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
_currentSession.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (Exception ex)
{
//自动回退到备用SessionPool
_logger.LogError($"{nameof(RefreshSessionPool)} 切换SessionPool失败回退到默认配置{ex.Serialize()}");
_runtimeContext.UseTableSessionPool = !_runtimeContext.UseTableSessionPool;
_currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
}
}
/// <summary>
/// 切换 SessionPool
/// </summary>
/// <param name="useTableSession"></param>
public void SwitchSessionPool(bool useTableSession)
{
if (_runtimeContext.UseTableSessionPool != useTableSession)
{
_runtimeContext.UseTableSessionPool = useTableSession;
RefreshSessionPool();
}
} }
/// <summary> /// <summary>
@ -60,24 +77,13 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="entity"></param> /// <param name="entity"></param>
/// <returns></returns> /// <returns></returns>
public async Task InsertAsync<T>(T entity, int buildTabletMode) where T : IoTEntity public async Task InsertAsync<T>(T entity) where T : IoTEntity
{ {
var metadata = GetMetadata<T>(); var metadata = GetMetadata<T>();
var tablet = BuildTablet(new[] { entity }, metadata, buildTabletMode); var tablet = BuildTablet(new[] { entity }, metadata);
int result = 0;
if (buildTabletMode == 1) int result = await _currentSession.InsertAsync(tablet);
{
result = await _sessionPool.InsertAsync(tablet);
}
else if(buildTabletMode == 2)
{
result = await _sessionPool.InsertAsync(tablet);
}
else
{
throw new Exception($"{nameof(InsertAsync)} 表模型{typeof(T).Name}记录入库时buildTabletMode参数值不正确请赋值以后重新处理。");
}
if (result <= 0) if (result <= 0)
{ {
@ -89,9 +95,8 @@ namespace JiShe.CollectBus.IoTDBProvider
/// 批量插入数据 /// 批量插入数据
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="entities"></param>
/// <returns></returns> /// <returns></returns>
public async Task BatchInsertAsync<T>(IEnumerable<T> entities, int buildTabletMode) where T : IoTEntity public async Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity
{ {
var metadata = GetMetadata<T>(); var metadata = GetMetadata<T>();
@ -100,8 +105,8 @@ namespace JiShe.CollectBus.IoTDBProvider
foreach (var batch in batches) foreach (var batch in batches)
{ {
var tablet = BuildTablet(batch, metadata, buildTabletMode); var tablet = BuildTablet(batch, metadata);
var result = await _sessionPool.InsertAsync(tablet); var result = await _currentSession.InsertAsync(tablet);
if (result <= 0) if (result <= 0)
{ {
_logger.LogWarning($"{typeof(T).Name} 批量插入数据第{batch}批次没有成功,共{batches}批次。"); _logger.LogWarning($"{typeof(T).Name} 批量插入数据第{batch}批次没有成功,共{batches}批次。");
@ -119,7 +124,7 @@ namespace JiShe.CollectBus.IoTDBProvider
public async Task<object> DeleteAsync<T>(QueryOptions options) where T : IoTEntity public async Task<object> DeleteAsync<T>(QueryOptions options) where T : IoTEntity
{ {
var query = BuildDeleteSQL<T>(options); var query = BuildDeleteSQL<T>(options);
var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query); var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query);
if (!sessionDataSet.HasNext()) if (!sessionDataSet.HasNext())
{ {
@ -132,9 +137,6 @@ namespace JiShe.CollectBus.IoTDBProvider
return row.Values[0]; return row.Values[0];
} }
/// <summary> /// <summary>
/// 查询数据 /// 查询数据
/// </summary> /// </summary>
@ -144,7 +146,7 @@ namespace JiShe.CollectBus.IoTDBProvider
public async Task<PagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new() public async Task<PagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
{ {
var query = BuildQuerySQL<T>(options); var query = BuildQuerySQL<T>(options);
var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query); var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query);
var result = new PagedResult<T> var result = new PagedResult<T>
{ {
@ -156,17 +158,17 @@ namespace JiShe.CollectBus.IoTDBProvider
} }
/// <summary> /// <summary>
/// 构建表模型 /// 构建Tablet
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="entities">表实体</param> /// <param name="entities">表实体</param>
/// <param name="metadata">设备元数据</param> /// <param name="metadata">设备元数据</param></param>
/// <param name="buildTabletMode">构建表模型方式1 根据实体《T》直接显示指定Tag2根据实体《T》的名称指定表名</param>
/// <returns></returns> /// <returns></returns>
private Tablet BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata, int buildTabletMode) where T : IoTEntity private Tablet BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity
{ {
var timestamps = new List<long>(); var timestamps = new List<long>();
var values = new List<List<object>>(); var values = new List<List<object>>();
var devicePaths = new HashSet<string>();
foreach (var entity in entities) foreach (var entity in entities)
{ {
@ -175,40 +177,70 @@ namespace JiShe.CollectBus.IoTDBProvider
foreach (var measurement in metadata.ColumnNames) foreach (var measurement in metadata.ColumnNames)
{ {
var value = typeof(T).GetProperty(measurement)?.GetValue(entity); var value = typeof(T).GetProperty(measurement)?.GetValue(entity);
if (value == null) rowValues.Add(value ?? new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空不符合IoTDB设计标准请赋值以后重新处理。"));
{
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空不符合IoTDB设计标准请赋值以后重新处理。");
}
rowValues.Add(value);
} }
values.Add(rowValues); values.Add(rowValues);
if (!_runtimeContext.UseTableSessionPool)//树模型
{
devicePaths.Add(DevicePathBuilder.GetDevicePath(entity));
}
else
{
devicePaths.Add(DevicePathBuilder.GetTableName<T>());
}
} }
if (buildTabletMode == 1) if (devicePaths.Count > 1)
{
throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。");
}
return _runtimeContext.UseTableSessionPool
? BuildTableSessionTablet(metadata, devicePaths.First(), values, timestamps)
: BuildSessionTablet(metadata, devicePaths.First(), values, timestamps);
}
/// <summary>
/// 构建tree模型的Tablet
/// </summary>
/// <param name="metadata"></param>
/// <param name="devicePath"></param>
/// <param name="values"></param>
/// <param name="timestamps"></param>
/// <returns></returns>
private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath,
List<List<object>> values, List<long> timestamps)
{ {
return new Tablet( return new Tablet(
DevicePathBuilder.GetDeviceId(entities.First()), devicePath,
metadata.ColumnNames, metadata.ColumnNames,
metadata.DataTypes, metadata.DataTypes,
values, values,
timestamps timestamps
); );
} }
else if (buildTabletMode == 2)
/// <summary>
/// 构建表模型的Tablet
/// </summary>
/// <param name="metadata"></param>
/// <param name="devicePath"></param>
/// <param name="values"></param>
/// <param name="timestamps"></param>
/// <returns></returns>
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string devicePath,
List<List<object>> values, List<long> timestamps)
{ {
return new Tablet( var tablet = new Tablet(
DevicePathBuilder.GetTableName<T>(), devicePath,
metadata.ColumnNames, metadata.ColumnNames,
metadata.ColumnCategories, metadata.ColumnCategories,
metadata.DataTypes, metadata.DataTypes,
values, values,
timestamps timestamps
); );
}
else return tablet;
{
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时buildTabletMode参数值不正确请赋值以后重新处理。");
}
} }
/// <summary> /// <summary>
@ -222,7 +254,7 @@ namespace JiShe.CollectBus.IoTDBProvider
var metadata = GetMetadata<T>(); var metadata = GetMetadata<T>();
var sb = new StringBuilder("SELECT "); var sb = new StringBuilder("SELECT ");
sb.AppendJoin(", ", metadata.ColumnNames); sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTagName}"); sb.Append($" FROM {options.TableNameOrTreePath}");
if (options.Conditions.Any()) if (options.Conditions.Any())
{ {
@ -243,9 +275,18 @@ namespace JiShe.CollectBus.IoTDBProvider
private string BuildDeleteSQL<T>(QueryOptions options) where T : IoTEntity private string BuildDeleteSQL<T>(QueryOptions options) where T : IoTEntity
{ {
var metadata = GetMetadata<T>(); var metadata = GetMetadata<T>();
var sb = new StringBuilder("DELETE "); var sb = new StringBuilder();
sb.Append($" FROM {options.TableNameOrTagName}"); if (!_runtimeContext.UseTableSessionPool)
{
sb.Append("DELETE ");
}
else
{
sb.Append("DROP ");
}
sb.Append($" FROM {options.TableNameOrTreePath}");
sb.AppendJoin(", ", metadata.ColumnNames); sb.AppendJoin(", ", metadata.ColumnNames);
@ -283,13 +324,13 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <returns></returns> /// <returns></returns>
private async Task<int> GetTotalCount<T>(QueryOptions options) where T : IoTEntity private async Task<int> GetTotalCount<T>(QueryOptions options) where T : IoTEntity
{ {
var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTagName}"; var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}";
if (options.Conditions.Any()) if (options.Conditions.Any())
{ {
countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition));
} }
var result = await _sessionPool.ExecuteQueryStatementAsync(countQuery); var result = await _currentSession.ExecuteQueryStatementAsync(countQuery);
return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0;
} }
@ -334,14 +375,6 @@ namespace JiShe.CollectBus.IoTDBProvider
return results; return results;
} }
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
_sessionPool?.Close().Wait();
}
/// <summary> /// <summary>
/// 获取设备元数据 /// 获取设备元数据
/// </summary> /// </summary>

View File

@ -0,0 +1,36 @@
using JiShe.CollectBus.IoTDBProvider.Interface;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider.Provider
{
/// <summary>
/// 实现带缓存的Session工厂
/// </summary>
public class IoTDBSessionFactory : IIoTDBSessionFactory
{
private readonly IoTDBOptions _options;
private readonly ConcurrentDictionary<bool, IIoTDBSessionPool> _pools = new();
public IoTDBSessionFactory(IOptions<IoTDBOptions> options)
{
_options = options.Value;
}
public IIoTDBSessionPool GetSessionPool(bool useTableSession)
{
return _pools.GetOrAdd(useTableSession, key =>
{
return key
? new TableSessionPoolAdapter(_options)
: new SessionPoolAdapter(_options);
});
}
}
}

View File

@ -30,8 +30,8 @@
public string DeviceId { get; set; } public string DeviceId { get; set; }
/// <summary> /// <summary>
/// 当前时间戳,单位 /// 当前时间戳,单位
/// </summary> /// </summary>
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
} }
} }

View File

@ -0,0 +1,74 @@
using Apache.IoTDB.DataStructure;
using Apache.IoTDB;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.IoTDBProvider.Interface;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.IoTDBProvider.Provider
{
/// <summary>
/// 树模型连接池
/// </summary>
public class SessionPoolAdapter : IIoTDBSessionPool
{
private readonly SessionPool _sessionPool;
private readonly IoTDBOptions _options;
public SessionPoolAdapter(IoTDBOptions options)
{
_options = options;
_sessionPool = new SessionPool.Builder()
.SetNodeUrl(options.ClusterList)
.SetUsername(options.UserName)
.SetPassword(options.Password)
.SetFetchSize(options.FetchSize)
.SetPoolSize(options.PoolSize)
.Build();
}
/// <summary>
/// 打开连接池
/// </summary>
/// <returns></returns>
public async Task OpenAsync()
{
await _sessionPool.Open(false);
if (_options.OpenDebugMode)
{
_sessionPool.OpenDebugMode(builder =>
{
builder.AddConsole();
});
}
}
/// <summary>
/// 批量插入对齐时间序列数据
/// </summary>
/// <param name="tablet"></param>
/// <returns></returns>
public async Task<int> InsertAsync(Tablet tablet)
{
return await _sessionPool.InsertAlignedTabletAsync(tablet);
}
/// <summary>
/// 查询数据
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{
return await _sessionPool.ExecuteQueryStatementAsync(sql);
}
public void Dispose()
{
_sessionPool?.Close().ConfigureAwait(false).GetAwaiter().GetResult();
}
}
}

View File

@ -0,0 +1,72 @@
using Apache.IoTDB.DataStructure;
using Apache.IoTDB;
using JiShe.CollectBus.IoTDBProvider.Interface;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.IoTDBProvider.Provider
{
/// <summary>
/// 表模型Session连接池
/// </summary>
public class TableSessionPoolAdapter : IIoTDBSessionPool
{
private readonly TableSessionPool _sessionPool;
private readonly IoTDBOptions _options;
public TableSessionPoolAdapter(IoTDBOptions options)
{
_options = options;
_sessionPool = new TableSessionPool.Builder()
.SetNodeUrls(options.ClusterList)
.SetUsername(options.UserName)
.SetPassword(options.Password)
.SetFetchSize(options.FetchSize)
.SetPoolSize(options.PoolSize)
.SetDatabase(options.DataBaseName)
.Build();
}
/// <summary>
/// 打开连接池
/// </summary>
/// <returns></returns>
public async Task OpenAsync()
{
await _sessionPool.Open(false);
if (_options.OpenDebugMode)
{
_sessionPool.OpenDebugMode(builder => builder.AddConsole());
}
}
/// <summary>
/// 批量插入
/// </summary>
/// <param name="tablet"></param>
/// <returns></returns>
public async Task<int> InsertAsync(Tablet tablet)
{
return await _sessionPool.InsertAsync(tablet);
}
/// <summary>
/// 查询数据
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{
return await _sessionPool.ExecuteQueryStatementAsync(sql);
}
public void Dispose()
{
_sessionPool?.Close().ConfigureAwait(false).GetAwaiter().GetResult();
}
}
}