解决IoTDB无法重复调用查询接口的问题

This commit is contained in:
ChenYi 2025-04-22 23:44:37 +08:00
parent 26f6796409
commit 60aad0032b
4 changed files with 67 additions and 12 deletions

View File

@ -13,6 +13,12 @@ namespace JiShe.CollectBus.IoTDB.Interface
/// <returns></returns> /// <returns></returns>
Task OpenAsync(); Task OpenAsync();
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
Task CloseAsync();
/// <summary> /// <summary>
/// 插入数据 /// 插入数据
/// </summary> /// </summary>

View File

@ -25,7 +25,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <summary> /// <summary>
/// IoTDB数据源 /// IoTDB数据源
/// </summary> /// </summary>
public class IoTDbProvider : IIoTDbProvider, IScopedDependency public class IoTDbProvider : IIoTDbProvider, ITransientDependency
{ {
private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new(); private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new();
private readonly ILogger<IoTDbProvider> _logger; private readonly ILogger<IoTDbProvider> _logger;
@ -496,7 +496,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery); var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; if (result.HasNext())
{
await result.Close();
return 0;
}
var count = Convert.ToInt32(result.Next().Values[0]);
await result.Close();
return count;
} }
/// <summary> /// <summary>
@ -512,8 +521,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
var metadata = await GetMetadata<T>(); var metadata = await GetMetadata<T>();
var properties = typeof(T).GetProperties(); var properties = typeof(T).GetProperties();
metadata.ColumnNames.Insert(0, "Timestamps");
metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP); var columns = new List<string>() { "Timestamps" };
var dataTypes = new List<TSDataType>() { TSDataType.TIMESTAMP };
columns.AddRange(metadata.ColumnNames);
dataTypes.AddRange(metadata.DataTypes);
//metadata.ColumnNames.Insert(0, "Timestamps");
//metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP);
while (dataSet.HasNext() && results.Count < pageSize) while (dataSet.HasNext() && results.Count < pageSize)
{ {
@ -523,17 +537,17 @@ namespace JiShe.CollectBus.IoTDB.Provider
Timestamps = record.Timestamps Timestamps = record.Timestamps
}; };
foreach (var measurement in metadata.ColumnNames) foreach (var measurement in columns)
{ {
int indexOf = metadata.ColumnNames.IndexOf(measurement); int indexOf = columns.IndexOf(measurement);
var value = record.Values[indexOf]; var value = record.Values[indexOf];
TSDataType tSDataType = metadata.DataTypes[indexOf]; TSDataType tSDataType = dataTypes[indexOf];
var prop = properties.FirstOrDefault(p => var prop = properties.FirstOrDefault(p =>
p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase));
if (prop != null && !(value is System.DBNull)) if (prop != null && !(value is System.DBNull))
{ {
dynamic tempValue = GetTSDataValue(tSDataType,value); dynamic tempValue = GetTSDataValue(tSDataType, value);
if (measurement.ToLower().EndsWith("time")) if (measurement.ToLower().EndsWith("time"))
{ {
@ -548,7 +562,9 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
results.Add(entity); results.Add(entity);
} }
await dataSet.Close();
return results; return results;
} }

View File

@ -47,6 +47,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
} }
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (_sessionPool == null)
{
return;
}
await _sessionPool.Close();
}
/// <summary> /// <summary>
/// 批量插入对齐时间序列数据 /// 批量插入对齐时间序列数据
/// </summary> /// </summary>
@ -59,7 +72,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}"); throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}");
} }
//await CloseAsync();
return result; return result;
} }
@ -70,7 +83,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql) public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{ {
return await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout); var result = await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
} }
public void Dispose() public void Dispose()

View File

@ -45,6 +45,19 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
} }
/// <summary>
/// 关闭连接池
/// </summary>
/// <returns></returns>
public async Task CloseAsync()
{
if (_sessionPool == null)
{
return;
}
await _sessionPool.Close();
}
/// <summary> /// <summary>
/// 批量插入 /// 批量插入
/// </summary> /// </summary>
@ -58,6 +71,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}"); throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}");
} }
//await CloseAsync();
return result; return result;
} }
@ -68,7 +82,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql) public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{ {
return await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout); var result = await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
//await result.Close();
//await CloseAsync();
return result;
} }
public void Dispose() public void Dispose()