diff --git a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs index 9587549..6bcbc5c 100644 --- a/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs +++ b/modules/JiShe.CollectBus.IoTDB/Interface/IIoTDBSessionPool.cs @@ -13,6 +13,12 @@ namespace JiShe.CollectBus.IoTDB.Interface /// Task OpenAsync(); + /// + /// 关闭连接池 + /// + /// + Task CloseAsync(); + /// /// 插入数据 /// diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 15d2df1..9c7c602 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -25,7 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// /// IoTDB数据源 /// - public class IoTDbProvider : IIoTDbProvider, IScopedDependency + public class IoTDbProvider : IIoTDbProvider, ITransientDependency { private static readonly ConcurrentDictionary MetadataCache = new(); private readonly ILogger _logger; @@ -496,7 +496,16 @@ namespace JiShe.CollectBus.IoTDB.Provider } var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery); - return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; + if (result.HasNext()) + { + await result.Close(); + return 0; + } + + var count = Convert.ToInt32(result.Next().Values[0]); + await result.Close(); + + return count; } /// @@ -512,8 +521,13 @@ namespace JiShe.CollectBus.IoTDB.Provider var metadata = await GetMetadata(); var properties = typeof(T).GetProperties(); - metadata.ColumnNames.Insert(0, "Timestamps"); - metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP); + + var columns = new List() { "Timestamps" }; + var dataTypes = new List() { TSDataType.TIMESTAMP }; + columns.AddRange(metadata.ColumnNames); + dataTypes.AddRange(metadata.DataTypes); + //metadata.ColumnNames.Insert(0, "Timestamps"); + //metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP); while (dataSet.HasNext() && results.Count < pageSize) { @@ -523,17 +537,17 @@ namespace JiShe.CollectBus.IoTDB.Provider Timestamps = record.Timestamps }; - foreach (var measurement in metadata.ColumnNames) + foreach (var measurement in columns) { - int indexOf = metadata.ColumnNames.IndexOf(measurement); + int indexOf = columns.IndexOf(measurement); var value = record.Values[indexOf]; - TSDataType tSDataType = metadata.DataTypes[indexOf]; + TSDataType tSDataType = dataTypes[indexOf]; var prop = properties.FirstOrDefault(p => p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); if (prop != null && !(value is System.DBNull)) - { - dynamic tempValue = GetTSDataValue(tSDataType,value); + { + dynamic tempValue = GetTSDataValue(tSDataType, value); if (measurement.ToLower().EndsWith("time")) { @@ -548,7 +562,9 @@ namespace JiShe.CollectBus.IoTDB.Provider } results.Add(entity); + } + await dataSet.Close(); return results; } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs index c8c36ee..ee71cf1 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs @@ -47,6 +47,19 @@ namespace JiShe.CollectBus.IoTDB.Provider } } + /// + /// 关闭连接池 + /// + /// + public async Task CloseAsync() + { + if (_sessionPool == null) + { + return; + } + await _sessionPool.Close(); + } + /// /// 批量插入对齐时间序列数据 /// @@ -59,7 +72,7 @@ namespace JiShe.CollectBus.IoTDB.Provider { throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功,返回结果为:{result}"); } - + //await CloseAsync(); return result; } @@ -70,7 +83,10 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task ExecuteQueryStatementAsync(string sql) { - return await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout); + var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout); + //await result.Close(); + //await CloseAsync(); + return result; } public void Dispose() diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs index a6e0f2a..dc4f0ee 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs @@ -45,6 +45,19 @@ namespace JiShe.CollectBus.IoTDB.Provider } } + /// + /// 关闭连接池 + /// + /// + public async Task CloseAsync() + { + if (_sessionPool == null) + { + return; + } + await _sessionPool.Close(); + } + /// /// 批量插入 /// @@ -58,6 +71,7 @@ namespace JiShe.CollectBus.IoTDB.Provider throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功,返回结果为:{result}"); } + //await CloseAsync(); return result; } @@ -68,7 +82,10 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task ExecuteQueryStatementAsync(string sql) { - return await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout); + var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout); + //await result.Close(); + //await CloseAsync(); + return result; } public void Dispose()