2025-04-02 17:23:52 +08:00

295 lines
10 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
{
/// <summary>
/// IoTDB数据源
/// </summary>
public class IoTDBProvider : IIoTDBProvider, IDisposable
{
private readonly IoTDBOptions _options;
private readonly TableSessionPool _sessionPool;
private static readonly ConcurrentDictionary<Type, DeviceMetadata> _metadataCache = new();
public IoTDBProvider(IOptions<IoTDBOptions> options)
{
_options = options.Value;
_sessionPool = new TableSessionPool.Builder()
.SetNodeUrls(_options.ClusterList)
.SetUsername(_options.UserName)
.SetPassword(_options.Password)
.SetFetchSize(_options.PoolSize)
.Build();
_sessionPool.Open(false).Wait();
}
/// <summary>
/// 获取设备元数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
private DeviceMetadata GetMetadata<T>() where T : IoTEntity
{
return _metadataCache.GetOrAdd(typeof(T), type =>
{
var metadata = new DeviceMetadata();
foreach (var prop in type.GetProperties())
{
//标签列
var attrTAG = prop.GetCustomAttribute<TAGColumnAttribute>();
if (attrTAG != null)
{
metadata.ColumnCategories.Add(ColumnCategory.TAG);
}
//属性列
var attrATTRIBUTE = prop.GetCustomAttribute<ATTRIBUTEColumnAttribute>();
if (attrATTRIBUTE != null)
{
metadata.ColumnCategories.Add(ColumnCategory.ATTRIBUTE);
}
//数据列
var attrFIELD = prop.GetCustomAttribute<FIELDColumnAttribute>();
if (attrFIELD != null)
{
metadata.ColumnCategories.Add(ColumnCategory.FIELD);
metadata.Measurements.Add(prop.Name);
metadata.DataTypes.Add(GetDataTypeFromStr(prop.PropertyType.Name));
}
}
return metadata;
});
}
/// <summary>
/// 插入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entity"></param>
/// <returns></returns>
public async Task InsertAsync<T>(T entity) where T : IoTEntity
{
var metadata = GetMetadata<T>();
var tablet = BuildTablet(new[] { entity }, metadata);
await _sessionPool.InsertAsync(tablet);
}
/// <summary>
/// 批量插入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entities"></param>
/// <returns></returns>
public async Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity
{
var metadata = GetMetadata<T>();
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, metadata);
await _sessionPool.InsertAsync(tablet);
}
}
/// <summary>
/// 构建表模型
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entities"></param>
/// <param name="metadata"></param>
/// <returns></returns>
private Tablet BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity
{
var deviceId = DevicePathBuilder.GetDeviceId(entities.First());
var timestamps = new List<long>();
var values = new List<List<object>>();
foreach (var entity in entities)
{
timestamps.Add(entity.Timestamps);
var rowValues = new List<object>();
foreach (var measurement in metadata.Measurements)
{
var value = typeof(T).GetProperty(measurement)?.GetValue(entity);
if(value == null)
{
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空不符合IoTDB设计标准请赋值以后重新处理。");
}
rowValues.Add(value);
}
values.Add(rowValues);
}
return new Tablet(
deviceId,
metadata.Measurements,
metadata.DataTypes,
values,
timestamps
);
}
/// <summary>
/// 查询数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
public async Task<PagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
{
var query = BuildQuery<T>(options);
var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query);
var result = new PagedResult<T>
{
TotalCount = await GetTotalCount<T>(options),
Items = ParseResults<T>(sessionDataSet, options.PageSize)
};
return result;
}
/// <summary>
/// 构建查询语句
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
private string BuildQuery<T>(QueryOptions options) where T : IoTEntity
{
var metadata = GetMetadata<T>();
var sb = new StringBuilder("SELECT ");
sb.AppendJoin(", ", metadata.Measurements);
sb.Append($" FROM {DevicePathBuilder.GetTableName<T>()}");
if (options.Conditions.Any())
{
sb.Append(" WHERE ");
sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition));
}
sb.Append($" LIMIT {options.PageSize} OFFSET {options.Page * options.PageSize}");
return sb.ToString();
}
/// <summary>
/// 将查询条件转换为SQL语句
/// </summary>
/// <param name="condition"></param>
/// <returns></returns>
/// <exception cref="NotSupportedException"></exception>
private string TranslateCondition(QueryCondition condition)
{
return condition.Operator switch
{
">" => $"{condition.Field} > {condition.Value}",
"<" => $"{condition.Field} < {condition.Value}",
"=" => $"{condition.Field} = '{condition.Value}'",
_ => throw new NotSupportedException($"Operator {condition.Operator} not supported")
};
}
/// <summary>
/// 获取查询条件的总数量
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
private async Task<int> GetTotalCount<T>(QueryOptions options) where T : IoTEntity
{
var countQuery = $"SELECT COUNT(*) FROM {DevicePathBuilder.GetTableName<T>()}";
if (options.Conditions.Any())
{
countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition));
}
var result = await _sessionPool.ExecuteQueryStatementAsync(countQuery);
return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0;
}
/// <summary>
/// 解析查询结果
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="dataSet"></param>
/// <param name="pageSize"></param>
/// <returns></returns>
private IEnumerable<T> ParseResults<T>(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new()
{
var results = new List<T>();
var metadata = GetMetadata<T>();
var properties = typeof(T).GetProperties();
while (dataSet.HasNext() && results.Count < pageSize)
{
var record = dataSet.Next();
var entity = new T
{
Timestamps = record.Timestamps
};
foreach (var measurement in metadata.Measurements)
{
var value = record.Values;
var prop = properties.FirstOrDefault(p =>
p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase));
if (prop != null)
{
typeof(T).GetProperty(measurement)?.SetValue(entity, value);
}
}
results.Add(entity);
}
return results;
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
_sessionPool?.Close().Wait();
}
private TSDataType GetDataTypeFromStr(string str)
{
return str switch
{
"BOOLEAN" => TSDataType.BOOLEAN,
"INT32" => TSDataType.INT32,
"INT64" => TSDataType.INT64,
"FLOAT" => TSDataType.FLOAT,
"DOUBLE" => TSDataType.DOUBLE,
"TEXT" => TSDataType.TEXT,
"NULLTYPE" => TSDataType.NONE,
"TIMESTAMP" => TSDataType.TIMESTAMP,
"DATE" => TSDataType.DATE,
"BLOB" => TSDataType.BLOB,
"STRING" => TSDataType.STRING,
_ => TSDataType.STRING
};
}
}
}