2025-04-21 10:17:40 +08:00

623 lines
22 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Collections.Concurrent;
using System.Reflection;
using System.Text;
using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>
/// IoTDB数据源
/// </summary>
public class IoTDbProvider : IIoTDbProvider, IScopedDependency
{
private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new();
private readonly ILogger<IoTDbProvider> _logger;
private readonly IIoTDbSessionFactory _sessionFactory;
private readonly IoTDbRuntimeContext _runtimeContext;
private IIoTDbSessionPool CurrentSession =>
_sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
/// <summary>
/// IoTDbProvider
/// </summary>
/// <param name="logger"></param>
/// <param name="sessionFactory"></param>
/// <param name="runtimeContext"></param>
public IoTDbProvider(
ILogger<IoTDbProvider> logger,
IIoTDbSessionFactory sessionFactory,
IoTDbRuntimeContext runtimeContext)
{
_logger = logger;
_sessionFactory = sessionFactory;
_runtimeContext = runtimeContext;
}
/// <summary>
/// 插入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entity"></param>
/// <returns></returns>
public async Task InsertAsync<T>(T entity) where T : IoTEntity
{
var metadata = GetMetadata<T>();
var tablet = BuildTablet(new[] { entity }, metadata);
await CurrentSession.InsertAsync(tablet);
//int result = await _currentSession.InsertAsync(tablet);
//if (result <= 0)
//{
// _logger.LogError($"{typeof(T).Name}插入数据没有成功");
//}
}
/// <summary>
/// 批量插入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public async Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity
{
var metadata = GetMetadata<T>();
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, metadata);
await CurrentSession.InsertAsync(tablet);
//var result = await _currentSession.InsertAsync(tablet);
//if (result <= 0)
//{
// _logger.LogWarning($"{typeof(T).Name} 批量插入数据第{batch}批次没有成功,共{batches}批次。");
//}
}
}
/// <summary>
/// 删除数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
public async Task<object> DeleteAsync<T>(QueryOptions options) where T : IoTEntity
{
var query = BuildDeleteSQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
if (!sessionDataSet.HasNext())
{
_logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。");
return 0;
}
//获取唯一结果行
var row = sessionDataSet.Next();
return row.Values[0];
}
/// <summary>
/// 查询数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
public async Task<BusPagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
{
var query = BuildQuerySQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
var result = new BusPagedResult<T>
{
TotalCount = await GetTotalCount<T>(options),
Items = ParseResults<T>(sessionDataSet, options.PageSize)
};
return result;
}
/// <summary>
/// 构建Tablet
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entities">表实体</param>
/// <param name="metadata">设备元数据</param></param>
/// <returns></returns>
private Tablet BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity
{
var timestamps = new List<long>();
var values = new List<List<object>>();
var devicePaths = new HashSet<string>();
List<string> tempColumnNames = new List<string>();
tempColumnNames.AddRange(metadata.ColumnNames);
foreach (var entity in entities)
{
timestamps.Add(entity.Timestamps);
var rowValues = new List<object>();
foreach (var measurement in tempColumnNames)
{
PropertyInfo propertyInfo = typeof(T).GetProperty(measurement);
if (propertyInfo == null)
{
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。");
}
var value = propertyInfo.GetValue(entity);
if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && value != null)//表示当前对象是单测点模式
{
Type tupleType = value.GetType();
Type[] tupleArgs = tupleType.GetGenericArguments();
Type item2Type = tupleArgs[1]; // T 的实际类型
var item1 = tupleType.GetProperty("Item1")!.GetValue(value);
var item2 = tupleType.GetProperty("Item2")!.GetValue(value);
if (item1 == null || item2 == null)
{
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。");
}
var indexOf = metadata.ColumnNames.IndexOf(measurement);
metadata.ColumnNames[indexOf] = (string)item1!;
rowValues.Add(item2);
}
else
{
if (value != null)
{
rowValues.Add(value);
}
else
{
//填充默认数据值
DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue);
rowValues.Add(defaultValue);
}
}
}
values.Add(rowValues);
if (!_runtimeContext.UseTableSessionPool)//树模型
{
devicePaths.Add(DevicePathBuilder.GetDevicePath(entity));
}
else
{
devicePaths.Add(DevicePathBuilder.GetTableName<T>());
}
}
if (devicePaths.Count > 1)
{
throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。");
}
return _runtimeContext.UseTableSessionPool
? BuildTableSessionTablet(metadata, devicePaths.First(), values, timestamps)
: BuildSessionTablet(metadata, devicePaths.First(), values, timestamps);
}
/// <summary>
/// 构建tree模型的Tablet
/// </summary>
/// <param name="metadata"></param>
/// <param name="devicePath"></param>
/// <param name="values"></param>
/// <param name="timestamps"></param>
/// <returns></returns>
private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath,
List<List<object>> values, List<long> timestamps)
{
return new Tablet(
devicePath,
metadata.ColumnNames,
metadata.DataTypes,
values,
timestamps
);
}
/// <summary>
/// 构建表模型的Tablet
/// </summary>
/// <param name="metadata"></param>
/// <param name="devicePath"></param>
/// <param name="values"></param>
/// <param name="timestamps"></param>
/// <returns></returns>
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string devicePath,
List<List<object>> values, List<long> timestamps)
{
var tablet = new Tablet(
devicePath,
metadata.ColumnNames,
metadata.ColumnCategories,
metadata.DataTypes,
values,
timestamps
);
return tablet;
}
/// <summary>
/// 构建查询语句
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
private string BuildQuerySQL<T>(QueryOptions options) where T : IoTEntity
{
var metadata = GetMetadata<T>();
var sb = new StringBuilder("SELECT ");
sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTreePath}");
if (options.Conditions.Any())
{
sb.Append(" WHERE ");
sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition));
}
sb.Append($" LIMIT {options.PageSize} OFFSET {options.Page * options.PageSize}");
return sb.ToString();
}
/// <summary>
/// 构建删除语句
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
private string BuildDeleteSQL<T>(QueryOptions options) where T : IoTEntity
{
var metadata = GetMetadata<T>();
var sb = new StringBuilder();
if (!_runtimeContext.UseTableSessionPool)
{
sb.Append("DELETE ");
}
else
{
sb.Append("DROP ");
}
sb.Append($" FROM {options.TableNameOrTreePath}");
sb.AppendJoin(", ", metadata.ColumnNames);
if (options.Conditions.Any())
{
sb.Append(" WHERE ");
sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition));
}
return sb.ToString();
}
/// <summary>
/// 将查询条件转换为SQL语句
/// </summary>
/// <param name="condition"></param>
/// <returns></returns>
/// <exception cref="NotSupportedException"></exception>
private string TranslateCondition(QueryCondition condition)
{
return condition.Operator switch
{
">" => $"{condition.Field} > {condition.Value}",
"<" => $"{condition.Field} < {condition.Value}",
"=" => $"{condition.Field} = '{condition.Value}'",
_ => throw new NotSupportedException($"Operator {condition.Operator} not supported")
};
}
/// <summary>
/// 获取查询条件的总数量
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
private async Task<int> GetTotalCount<T>(QueryOptions options) where T : IoTEntity
{
var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}";
if (options.Conditions.Any())
{
countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition));
}
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0;
}
/// <summary>
/// 解析查询结果
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="dataSet"></param>
/// <param name="pageSize"></param>
/// <returns></returns>
private IEnumerable<T> ParseResults<T>(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new()
{
var results = new List<T>();
var metadata = GetMetadata<T>();
var properties = typeof(T).GetProperties();
while (dataSet.HasNext() && results.Count < pageSize)
{
var record = dataSet.Next();
var entity = new T
{
Timestamps = record.Timestamps
};
foreach (var measurement in metadata.ColumnNames)
{
var value = record.Values;
var prop = properties.FirstOrDefault(p =>
p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase));
if (prop != null)
{
typeof(T).GetProperty(measurement)?.SetValue(entity, value);
}
}
results.Add(entity);
}
return results;
}
/// <summary>
/// 获取设备元数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
private DeviceMetadata GetMetadata<T>() where T : IoTEntity
{
var columns = CollectColumnMetadata(typeof(T));
var metadata = BuildDeviceMetadata(columns);
return MetadataCache.AddOrUpdate(
typeof(T),
addValueFactory: t => metadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) =>
{
var columns = CollectColumnMetadata(t);
var metadata = BuildDeviceMetadata(columns);
//对现有值 existingValue 进行修改,返回新值
existingValue.ColumnNames = metadata.ColumnNames;
return existingValue;
}
);
//return _metadataCache.GetOrAdd(typeof(T), type =>
//{
// var columns = CollectColumnMetadata(type);
// var metadata = BuildDeviceMetadata(columns);
// //if (metadata.IsSingleMeasuring)
// //{
// // _metadataCache.Remove(typeof(T));
// //}
// return metadata;
//});
}
/// <summary>
/// 获取设备元数据的列
/// </summary>
/// <param name="type"></param>
/// <returns></returns>
private List<ColumnInfo> CollectColumnMetadata(Type type)
{
var columns = new List<ColumnInfo>();
foreach (var prop in type.GetProperties())
{
//先获取Tag标签和属性标签
ColumnInfo? column = prop.GetCustomAttribute<TAGColumnAttribute>() is not null ? new ColumnInfo(
name: prop.Name,
category: ColumnCategory.TAG,
dataType: GetDataTypeFromTypeName(prop.PropertyType.Name),
false
) : prop.GetCustomAttribute<ATTRIBUTEColumnAttribute>() is not null ? new ColumnInfo(
prop.Name,
ColumnCategory.ATTRIBUTE,
GetDataTypeFromTypeName(prop.PropertyType.Name),
false
) : prop.GetCustomAttribute<FIELDColumnAttribute>() is not null ? new ColumnInfo(
prop.Name,
ColumnCategory.FIELD,
GetDataTypeFromTypeName(prop.PropertyType.Name),
false)
: null;
//最先检查是不是单侧点模式
SingleMeasuringAttribute singleMeasuringAttribute = prop.GetCustomAttribute<SingleMeasuringAttribute>();
if (singleMeasuringAttribute != null && column == null)
{
//warning: 单侧点模式注意事项
//Entity实体 字段类型是 Tuple<string,T>,Item1=>测点名称Item2=>测点值,泛型
//只有一个Filed字段。
//MeasuringName 默认为 SingleMeasuringAttribute.FieldName以便于在获取对应的Value的时候重置为 Item1 的值。
Type tupleType = prop.PropertyType;
Type[] tupleArgs = tupleType.GetGenericArguments();
column = new ColumnInfo(
singleMeasuringAttribute.FieldName,
ColumnCategory.FIELD,
GetDataTypeFromTypeName(tupleArgs[1].Name),
true
);
}
if (column.HasValue)
{
columns.Add(column.Value);
}
}
return columns;
}
/// <summary>
/// 构建设备元数据
/// </summary>
/// <param name="columns"></param>
/// <returns></returns>
private DeviceMetadata BuildDeviceMetadata(List<ColumnInfo> columns)
{
var metadata = new DeviceMetadata();
//先检查是不是单侧点模型
if (columns.Any(c => c.IsSingleMeasuring))
{
metadata.IsSingleMeasuring = true;
}
//按业务逻辑顺序处理TAG -> ATTRIBUTE -> FIELD
var groupedColumns = columns
.GroupBy(c => c.Category)
.ToDictionary(g => g.Key, g => g.ToList());
ProcessCategory(groupedColumns, ColumnCategory.TAG, metadata);
ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata);
ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata);
return metadata;
}
/// <summary>
/// 处理不同列类型的逻辑
/// </summary>
/// <param name="groupedColumns"></param>
/// <param name="category"></param>
/// <param name="metadata"></param>
private void ProcessCategory(IReadOnlyDictionary<ColumnCategory, List<ColumnInfo>> groupedColumns, ColumnCategory category, DeviceMetadata metadata)
{
if (groupedColumns.TryGetValue(category, out var cols))
{
metadata.ColumnNames.AddRange(cols.Select(c => c.Name));
metadata.ColumnCategories.AddRange(cols.Select(c => c.Category));
metadata.DataTypes.AddRange(cols.Select(c => c.DataType));
}
}
/// <summary>
/// 数据列结构
/// </summary>
private readonly struct ColumnInfo
{
/// <summary>
/// 列名
/// </summary>
public string Name { get; }
/// <summary>
/// 是否是单测点
/// </summary>
public bool IsSingleMeasuring { get; }
/// <summary>
/// 列类型
/// </summary>
public ColumnCategory Category { get; }
/// <summary>
/// 数据类型
/// </summary>
public TSDataType DataType { get; }
public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring)
{
Name = name;
Category = category;
DataType = dataType;
IsSingleMeasuring = isSingleMeasuring;
}
}
/// <summary>
/// 根据类型名称获取对应的 IoTDB 数据类型
/// </summary>
/// <param name="typeName">类型名称(不区分大小写)</param>
/// <returns>对应的 TSDataType默认返回 TSDataType.STRING</returns>
private TSDataType GetDataTypeFromTypeName(string typeName)
{
if (string.IsNullOrWhiteSpace(typeName))
return TSDataType.STRING;
return DataTypeMap.TryGetValue(typeName.Trim(), out var dataType)
? dataType
: TSDataType.STRING;
}
/// <summary>
/// 根据类型名称获取 IoTDB 数据类型
/// </summary>
private readonly IReadOnlyDictionary<string, TSDataType> DataTypeMap =
new Dictionary<string, TSDataType>(StringComparer.OrdinalIgnoreCase)
{
["BOOLEAN"] = TSDataType.BOOLEAN,
["INT32"] = TSDataType.INT32,
["INT64"] = TSDataType.INT64,
["FLOAT"] = TSDataType.FLOAT,
["DOUBLE"] = TSDataType.DOUBLE,
["TEXT"] = TSDataType.TEXT,
["NULLTYPE"] = TSDataType.NONE,
["TIMESTAMP"] = TSDataType.TIMESTAMP,
["DATE"] = TSDataType.DATE,
["BLOB"] = TSDataType.BLOB,
["DECIMAL"] = TSDataType.STRING,
["STRING"] = TSDataType.STRING
};
/// <summary>
/// 根据类型名称获取 IoTDB 数据默认值
/// </summary>
private readonly IReadOnlyDictionary<string, object> DataTypeDefaultValueMap =
new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
{
["BOOLEAN"] = false,
["INT32"] = 0,
["INT64"] = 0,
["FLOAT"] = 0.0f,
["DOUBLE"] = 0.0d,
["TEXT"] = string.Empty,
["NULLTYPE"] = null,
["TIMESTAMP"] = null,
["DATE"] = null,
["BLOB"] = null,
["DECIMAL"] = "0.0",
["STRING"] = string.Empty
};
}
}