106 lines
3.1 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging;
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>
/// 树模型连接池
/// </summary>
public class SessionPoolAdapter : IIoTDbSessionPool
{
private readonly SessionPool _sessionPool;
private readonly IoTDbOptions _options;
/// <summary>
/// SessionPoolAdapter
/// </summary>
/// <param name="options"></param>
public SessionPoolAdapter(IoTDbOptions options)
{
_options = options;
_sessionPool = new SessionPool.Builder()
.SetNodeUrl(options.ClusterList)
.SetUsername(options.UserName)
.SetPassword(options.Password)
.SetZoneId(options.ZoneId)
.SetFetchSize(options.FetchSize)
.SetPoolSize(options.PoolSize)
.Build();
}
/// <summary>
/// 打开连接池
/// </summary>
/// <returns></returns>
public async Task OpenAsync()
{
await _sessionPool.Open(false);
if (_options.OpenDebugMode)
{
_sessionPool.OpenDebugMode(builder =>
{
builder.AddConsole();
});
}
}
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (_sessionPool == null)
{
return;
}
await _sessionPool.Close();
}
/// <summary>
/// 批量插入对齐时间序列数据
/// </summary>
/// <param name="tablet"></param>
/// <returns></returns>
public async Task<int> InsertAsync(Tablet tablet)
{
var result = await _sessionPool.InsertAlignedTabletAsync(tablet);
if (result != 0)
{
throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}请检查IoTEntity继承子类属性索引是否有变动。");
}
return result;
}
/// <summary>
/// 查询数据
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
return result;
}
/// <summary>
/// 执行无返回结果SQL
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
public async Task<int> ExecuteNonQueryStatementAsync(string sql)
{
var result = await _sessionPool.ExecuteNonQueryStatementAsync(sql);
return result;
}
public void Dispose()
{
_sessionPool?.Close().ConfigureAwait(false).GetAwaiter().GetResult();
}
}
}