From 60aad0032bdd7237782316dbd2db477a63a91e4a Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Tue, 22 Apr 2025 23:44:37 +0800
Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3IoTDB=E6=97=A0=E6=B3=95?=
=?UTF-8?q?=E9=87=8D=E5=A4=8D=E8=B0=83=E7=94=A8=E6=9F=A5=E8=AF=A2=E6=8E=A5?=
=?UTF-8?q?=E5=8F=A3=E7=9A=84=E9=97=AE=E9=A2=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Interface/IIoTDBSessionPool.cs | 6 ++++
.../Provider/IoTDBProvider.cs | 34 ++++++++++++++-----
.../Provider/SessionPoolAdapter.cs | 20 +++++++++--
.../Provider/TableSessionPoolAdapter.cs | 19 ++++++++++-
4 files changed, 67 insertions(+), 12 deletions(-)
diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs
index 9587549..6bcbc5c 100644
--- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs
@@ -13,6 +13,12 @@ namespace JiShe.CollectBus.IoTDB.Interface
///
Task OpenAsync();
+ ///
+ /// 关闭连接池
+ ///
+ ///
+ Task CloseAsync();
+
///
/// 插入数据
///
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
index 15d2df1..9c7c602 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
@@ -25,7 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
/// IoTDB数据源
///
- public class IoTDbProvider : IIoTDbProvider, IScopedDependency
+ public class IoTDbProvider : IIoTDbProvider, ITransientDependency
{
private static readonly ConcurrentDictionary MetadataCache = new();
private readonly ILogger _logger;
@@ -496,7 +496,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
- return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0;
+ if (result.HasNext())
+ {
+ await result.Close();
+ return 0;
+ }
+
+ var count = Convert.ToInt32(result.Next().Values[0]);
+ await result.Close();
+
+ return count;
}
///
@@ -512,8 +521,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
var metadata = await GetMetadata();
var properties = typeof(T).GetProperties();
- metadata.ColumnNames.Insert(0, "Timestamps");
- metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP);
+
+ var columns = new List() { "Timestamps" };
+ var dataTypes = new List() { TSDataType.TIMESTAMP };
+ columns.AddRange(metadata.ColumnNames);
+ dataTypes.AddRange(metadata.DataTypes);
+ //metadata.ColumnNames.Insert(0, "Timestamps");
+ //metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP);
while (dataSet.HasNext() && results.Count < pageSize)
{
@@ -523,17 +537,17 @@ namespace JiShe.CollectBus.IoTDB.Provider
Timestamps = record.Timestamps
};
- foreach (var measurement in metadata.ColumnNames)
+ foreach (var measurement in columns)
{
- int indexOf = metadata.ColumnNames.IndexOf(measurement);
+ int indexOf = columns.IndexOf(measurement);
var value = record.Values[indexOf];
- TSDataType tSDataType = metadata.DataTypes[indexOf];
+ TSDataType tSDataType = dataTypes[indexOf];
var prop = properties.FirstOrDefault(p =>
p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase));
if (prop != null && !(value is System.DBNull))
- {
- dynamic tempValue = GetTSDataValue(tSDataType,value);
+ {
+ dynamic tempValue = GetTSDataValue(tSDataType, value);
if (measurement.ToLower().EndsWith("time"))
{
@@ -548,7 +562,9 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
results.Add(entity);
+
}
+ await dataSet.Close();
return results;
}
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
index c8c36ee..ee71cf1 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
@@ -47,6 +47,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
}
+ ///
+ /// 关闭连接池
+ ///
+ ///
+ public async Task CloseAsync()
+ {
+ if (_sessionPool == null)
+ {
+ return;
+ }
+ await _sessionPool.Close();
+ }
+
///
/// 批量插入对齐时间序列数据
///
@@ -59,7 +72,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功,返回结果为:{result}");
}
-
+ //await CloseAsync();
return result;
}
@@ -70,7 +83,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
public async Task ExecuteQueryStatementAsync(string sql)
{
- return await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
+ var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
+ //await result.Close();
+ //await CloseAsync();
+ return result;
}
public void Dispose()
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
index a6e0f2a..dc4f0ee 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
@@ -45,6 +45,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
}
+ ///
+ /// 关闭连接池
+ ///
+ ///
+ public async Task CloseAsync()
+ {
+ if (_sessionPool == null)
+ {
+ return;
+ }
+ await _sessionPool.Close();
+ }
+
///
/// 批量插入
///
@@ -58,6 +71,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功,返回结果为:{result}");
}
+ //await CloseAsync();
return result;
}
@@ -68,7 +82,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
public async Task ExecuteQueryStatementAsync(string sql)
{
- return await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
+ var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
+ //await result.Close();
+ //await CloseAsync();
+ return result;
}
public void Dispose()