using System; using System.Collections.Concurrent; using System.ComponentModel.DataAnnotations; using System.Diagnostics; using System.Reflection; using System.Reflection.Metadata.Ecma335; using System.Text; using System.Threading.Tasks; using Apache.IoTDB; using Apache.IoTDB.DataStructure; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; using Microsoft.Extensions.Logging; using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; using JiShe.CollectBus.Analyzers.Shared; namespace JiShe.CollectBus.IoTDB.Provider { /// /// IoTDB数据源 /// public class IoTDbProvider : IIoTDbProvider, ITransientDependency { private static readonly ConcurrentDictionary MetadataCache = new(); private readonly ILogger _logger; private readonly IIoTDbSessionFactory _sessionFactory; private readonly IoTDBRuntimeContext _runtimeContext; private IIoTDbSessionPool CurrentSession => _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); /// /// IoTDbProvider /// /// /// /// public IoTDbProvider( ILogger logger, IIoTDbSessionFactory sessionFactory, IoTDBRuntimeContext runtimeContext) { _logger = logger; _sessionFactory = sessionFactory; _runtimeContext = runtimeContext; } /// /// 插入数据 /// /// /// /// public async Task InsertAsync(T entity) where T : IoTEntity { try { var metadata = await GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata); await CurrentSession.InsertAsync(tablet); } catch (Exception ex) { _logger.LogError(ex, $"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时发生异常"); throw; } } /// /// 批量插入数据 /// /// /// public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity { try { var metadata = await GetMetadata(); var batchSize = 1000; var batches = entities.Chunk(batchSize); foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); await CurrentSession.InsertAsync(tablet); } } catch (Exception ex) { _logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常"); throw; } } /// /// 批量插入数据 /// /// /// 设备元数据 /// /// public async Task BatchInsertAsync(DeviceMetadata deviceMetadata, IEnumerable entities) where T : IoTEntity { try { var batchSize = 1000; var batches = entities.Chunk(batchSize); foreach (var batch in batches) { var tablet = BuildTablet(batch, deviceMetadata); await CurrentSession.InsertAsync(tablet); } } catch (Exception ex) { _logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常"); throw; } } /// /// 删除数据 /// /// /// /// public async Task DeleteAsync(IoTDBQueryOptions options) where T : IoTEntity { try { var query = await BuildDeleteSQL(options); var result = await CurrentSession.ExecuteQueryStatementAsync(query); if (result == null) { return 0; } if (!result.HasNext()) { _logger.LogWarning($"{typeof(T).Name} IoTDB删除{typeof(T).Name}的数据时,没有返回受影响记录数量。"); return 0; } //获取唯一结果行 var row = result.Next(); await result.Close(); var dataResult = row.Values[0]; return dataResult; } catch (Exception ex) { _logger.LogError(ex, $"{nameof(DeleteAsync)} IoTDB删除{typeof(T).Name}的数据时发生异常"); throw; } } /// /// 获取设备元数据 /// /// /// public async Task GetMetadata() where T : IoTEntity { var accessor = SourceEntityAccessorFactory.GetAccessor(); var columns = CollectColumnMetadata(typeof(T)); var metadata = BuildDeviceMetadata(columns); var metaData = MetadataCache.AddOrUpdate( typeof(T), addValueFactory: t => metadata, // 如果键不存在,用此值添加 updateValueFactory: (t, existingValue) => { var columns = CollectColumnMetadata(t); var metadata = BuildDeviceMetadata(columns); //对现有值 existingValue 进行修改,返回新值 existingValue.ColumnNames = metadata.ColumnNames; return existingValue; } ); return await Task.FromResult(metaData); } /// /// 查询数据 /// /// /// /// public async Task> QueryAsync(IoTDBQueryOptions options) where T : IoTEntity, new() { try { var stopwatch2 = Stopwatch.StartNew(); var query = await BuildQuerySQL(options); var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); _logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。"); var result = new BusPagedResult { TotalCount = await GetTotalCount(options), Items = await ParseResults(sessionDataSet, options.PageSize), PageIndex = options.PageIndex, PageSize = options.PageSize, }; stopwatch2.Stop(); _logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。"); //int totalPageCount = (int)Math.Ceiling((double)result.TotalCount / options.PageSize); if (result.Items.Count() < result.PageSize) { result.HasNext = false; } else { result.HasNext = true; } //result.HasNext = result.Items.Count() > 0 ? result.Items.Count() < result.PageSize : false; return result; } catch (Exception ex) { CurrentSession.Dispose(); _logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询{typeof(T).Name}的数据时发生异常"); throw; } } /// /// 构建Tablet /// /// /// 表实体 /// 设备元数据 /// private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity { var timestamps = new List(); var values = new List>(); var devicePaths = new HashSet(); List tempColumnNames = new List(); tempColumnNames.AddRange(metadata.ColumnNames); var accessor = SourceEntityAccessorFactory.GetAccessor(); var entityTypeAttribute = typeof(T).GetCustomAttribute(); if (entityTypeAttribute == null) { throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101"); } if (metadata.EntityType != entityTypeAttribute.EntityType) { throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102"); } if (metadata.EntityType == Enums.EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true) { throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接,属于异常情况,-103"); } else if (metadata.EntityType == Enums.EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false) { throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接,属于异常情况,-104"); } string tableNameOrTreePath = string.Empty; var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute(); if (tableNameOrTreePathAttribute != null) { tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath; } foreach (var entity in entities) { timestamps.Add(entity.Timestamps); var rowValues = new List(); foreach (var measurement in tempColumnNames) { rowValues.Add(accessor.GetPropertyValue(entity,measurement)); } //foreach (var measurement in tempColumnNames) //{ // PropertyInfo propertyInfo = typeof(T).GetProperty(measurement); // if (propertyInfo == null) // { // throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况,-101。"); // } // var value = propertyInfo.GetValue(entity); // if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false) && metadata.IsSingleMeasuring == true)//表示当前对象是单测点模式 // { // if (value != null) // { // Type tupleType = value.GetType(); // Type[] tupleArgs = tupleType.GetGenericArguments(); // Type item2Type = tupleArgs[1]; // T 的实际类型 // var item1 = tupleType.GetProperty("Item1")!.GetValue(value); // var item2 = tupleType.GetProperty("Item2")!.GetValue(value); // if (item1 == null || item2 == null) // { // throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,单测点模式构建失败,没有获取测点名称或者测点值,-102。"); // } // var indexOf = metadata.ColumnNames.IndexOf(measurement); // metadata.ColumnNames[indexOf] = (string)item1!; // rowValues.Add(item2); // } // else // { // rowValues.Add(null); // } // //同时如果是单测点模式,且是table模型存储,路径只能通过DevicePathBuilder.GetDeviceTableName(entity)获取 // if (_runtimeContext.UseTableSessionPool) // { // tableNameOrTreePath = DevicePathBuilder.GetDeviceTableName(entity); // } // } // else // { // //需要根据value的类型,进行相应的值映射转换,例如datetime转换为long的时间戳值 // if (value != null) // { // Type tupleType = value.GetType(); // var tempValue = tupleType.Name.ToUpper() switch // { // "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(), // _ => value // }; // rowValues.Add(tempValue); // } // else // { // rowValues.Add(value); // } // } //} values.Add(rowValues); //如果指定了路径 if (!string.IsNullOrWhiteSpace(tableNameOrTreePath)) { devicePaths.Add(tableNameOrTreePath); } else { if (!_runtimeContext.UseTableSessionPool)//树模型 { devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); } else { devicePaths.Add(DevicePathBuilder.GetTableName()); } } } if (devicePaths.Count > 1) { throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。"); } return _runtimeContext.UseTableSessionPool ? BuildTableSessionTablet(metadata, devicePaths.First(), values, timestamps) : BuildSessionTablet(metadata, devicePaths.First(), values, timestamps); } /// /// 构建tree模型的Tablet /// /// 已解析的设备数据元数据 /// 设备路径 /// 数据集合 /// 时间戳集合 /// private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List> values, List timestamps) { //todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段,只需要保留FIELD类型字段即可 return new Tablet( devicePath, metadata.ColumnNames, metadata.DataTypes, values, timestamps ); } /// /// 构建表模型的Tablet /// /// 已解析的设备数据元数据 /// 表名称 /// 数据集合 /// 时间戳集合 /// private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List> values, List timestamps) { var tablet = new Tablet( tableName, metadata.ColumnNames, metadata.ColumnCategories, metadata.DataTypes, values, timestamps ); return tablet; } /// /// 构建查询语句 /// /// /// /// private async Task BuildQuerySQL(IoTDBQueryOptions options) where T : IoTEntity { var metadata = await GetMetadata(); var sb = new StringBuilder("SELECT TIME as Timestamps,"); sb.AppendJoin(", ", metadata.ColumnNames); sb.Append($" FROM {options.TableNameOrTreePath}"); if (options.Conditions.Any()) { sb.Append(" WHERE "); sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); } sb.Append($" LIMIT {options.PageSize} OFFSET {options.PageIndex * options.PageSize}"); return sb.ToString(); } /// /// 构建删除语句 /// /// /// /// private async Task BuildDeleteSQL(IoTDBQueryOptions options) where T : IoTEntity { var metadata = await GetMetadata(); var sb = new StringBuilder(); if (!_runtimeContext.UseTableSessionPool) { sb.Append("DELETE "); } else { sb.Append("DROP "); } sb.Append($" FROM {options.TableNameOrTreePath}"); sb.AppendJoin(", ", metadata.ColumnNames); if (options.Conditions.Any()) { sb.Append(" WHERE "); sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); } return sb.ToString(); } /// /// 将查询条件转换为SQL语句 /// /// /// /// private string TranslateCondition(QueryCondition condition) { return condition.Operator switch { ">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}" : $"{condition.Field} > '{condition.Value}'", "<" => condition.IsNumber ? $"{condition.Field} < {condition.Value}" : $"{condition.Field} < '{condition.Value}'", "=" => condition.IsNumber ? $"{condition.Field} = {condition.Value}" : $"{condition.Field} = '{condition.Value}'", _ => throw new NotSupportedException($"{nameof(TranslateCondition)} 将查询条件转换为SQL语句时操作符 {condition.Operator} 属于异常情况") }; } /// /// 获取查询条件的总数量 /// /// /// /// private async Task GetTotalCount(IoTDBQueryOptions options) where T : IoTEntity { var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}"; if (options.Conditions.Any()) { countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); } var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery); if (result == null) { return 0; } if (!result.HasNext()) { return 0; } var count = Convert.ToInt32(result.Next().Values[0]); await result.Close(); return count; } /// /// 解析查询结果 /// /// /// /// /// private async Task> ParseResults(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() { var results = new List(); var metadata = await GetMetadata(); var properties = typeof(T).GetProperties(); var columns = new List() { "Timestamps" }; var dataTypes = new List() { TSDataType.TIMESTAMP }; columns.AddRange(metadata.ColumnNames); dataTypes.AddRange(metadata.DataTypes); //metadata.ColumnNames.Insert(0, "Timestamps"); //metadata.DataTypes.Insert(0, TSDataType.TIMESTAMP); while (dataSet.HasNext() && results.Count < pageSize) { var record = dataSet.Next(); var entity = new T { Timestamps = record.Timestamps }; foreach (var measurement in columns) { int indexOf = columns.IndexOf(measurement); var value = record.Values[indexOf]; TSDataType tSDataType = dataTypes[indexOf]; var prop = properties.FirstOrDefault(p => p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); if (prop != null && !(value is System.DBNull)) { dynamic tempValue = GetTSDataValue(tSDataType, value); if (measurement.ToLower().EndsWith("time")) { typeof(T).GetProperty(measurement)?.SetValue(entity, TimestampHelper.ConvertToDateTime(tempValue, TimestampUnit.Nanoseconds)); } else { typeof(T).GetProperty(measurement)?.SetValue(entity, tempValue); } } } results.Add(entity); } await dataSet.Close(); return results; } /// /// 获取设备元数据的列 /// /// /// private List CollectColumnMetadata(Type type) { var columns = new List(); foreach (var prop in type.GetProperties()) { string typeName = string.Empty; Type declaredType = prop.PropertyType; // 处理可空类型 if (declaredType.IsGenericType && declaredType.GetGenericTypeDefinition() == typeof(Nullable<>)) { Type underlyingType = Nullable.GetUnderlyingType(declaredType); typeName = underlyingType.Name; } else { typeName = declaredType.Name; } //先获取Tag标签和属性标签 ColumnInfo? column = prop.GetCustomAttribute() is not null ? new ColumnInfo( name: prop.Name, category: ColumnCategory.TAG, dataType: GetDataTypeFromTypeName(typeName), false ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(typeName), false ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.FIELD, GetDataTypeFromTypeName(typeName), false) : null; //最先检查是不是单侧点模式 SingleMeasuringAttribute singleMeasuringAttribute = prop.GetCustomAttribute(); if (singleMeasuringAttribute != null && column == null) { //warning: 单侧点模式注意事项 //Entity实体 字段类型是 Tuple,Item1=>测点名称,Item2=>测点值,泛型 //只有一个Filed字段。 //MeasuringName 默认为 SingleMeasuringAttribute.FieldName,以便于在获取对应的Value的时候重置为 Item1 的值。 Type tupleType = prop.PropertyType; Type[] tupleArgs = tupleType.GetGenericArguments(); column = new ColumnInfo( singleMeasuringAttribute.FieldName, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleArgs[1].Name), true ); } if (column.HasValue) { columns.Add(column.Value); } } return columns; } /// /// 构建设备元数据 /// /// 待解析的类 /// 已处理好的数据列 /// private DeviceMetadata BuildDeviceMetadata(List columns) where T : IoTEntity { var metadata = new DeviceMetadata(); //先检查是不是单侧点模型 if (columns.Any(c => c.IsSingleMeasuring)) { metadata.IsSingleMeasuring = true; } //按业务逻辑顺序处理(TAG -> ATTRIBUTE -> FIELD) var groupedColumns = columns .GroupBy(c => c.Category) .ToDictionary(g => g.Key, g => g.ToList()); ProcessCategory(groupedColumns, ColumnCategory.TAG, metadata); ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); var entityTypeAttribute = typeof(T).GetCustomAttribute(); if (entityTypeAttribute == null) { throw new ArgumentException($"{nameof(BuildDeviceMetadata)} 构建设备元数据时 {nameof(IoTEntity)} 的EntityType 没有指定,属于异常情况,-101"); } metadata.EntityType = entityTypeAttribute.EntityType; return metadata; } /// /// 处理不同列类型的逻辑 /// /// /// /// private void ProcessCategory(IReadOnlyDictionary> groupedColumns, ColumnCategory category, DeviceMetadata metadata) { if (groupedColumns.TryGetValue(category, out var cols)) { metadata.ColumnNames.AddRange(cols.Select(c => c.Name)); metadata.ColumnCategories.AddRange(cols.Select(c => c.Category)); metadata.DataTypes.AddRange(cols.Select(c => c.DataType)); } } /// /// 数据列结构 /// private readonly struct ColumnInfo { /// /// 列名 /// public string Name { get; } /// /// 是否是单测点 /// public bool IsSingleMeasuring { get; } /// /// 列类型 /// public ColumnCategory Category { get; } /// /// 数据类型 /// public TSDataType DataType { get; } public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring) { Name = name; Category = category; DataType = dataType; IsSingleMeasuring = isSingleMeasuring; } } /// /// 根据类型名称获取对应的 IoTDB 数据类型 /// /// 类型名称(不区分大小写) /// 对应的 TSDataType,默认返回 TSDataType.STRING private TSDataType GetDataTypeFromTypeName(string typeName) { if (string.IsNullOrWhiteSpace(typeName)) return TSDataType.STRING; return DataTypeMap.TryGetValue(typeName.Trim(), out var dataType) ? dataType : TSDataType.STRING; } /// /// 根据类型名称获取 IoTDB 数据类型 /// private readonly IReadOnlyDictionary DataTypeMap = new Dictionary(StringComparer.OrdinalIgnoreCase) { ["BOOLEAN"] = TSDataType.BOOLEAN, ["INT32"] = TSDataType.INT32, ["INT64"] = TSDataType.INT64, ["FLOAT"] = TSDataType.FLOAT, ["DOUBLE"] = TSDataType.DOUBLE, ["TEXT"] = TSDataType.TEXT, ["NULLTYPE"] = TSDataType.NONE, ["DATETIME"] = TSDataType.TIMESTAMP, ["DATE"] = TSDataType.DATE, ["BLOB"] = TSDataType.BLOB, ["DECIMAL"] = TSDataType.STRING, ["STRING"] = TSDataType.STRING }; /// /// 根据类型名称获取 IoTDB 数据默认值 /// private readonly IReadOnlyDictionary DataTypeDefaultValueMap = new Dictionary(StringComparer.OrdinalIgnoreCase) { ["BOOLEAN"] = false, ["INT32"] = 0, ["INT64"] = 0, ["FLOAT"] = 0.0f, ["DOUBLE"] = 0.0d, ["TEXT"] = string.Empty, ["NULLTYPE"] = null, ["DATETIME"] = null, ["DATE"] = null, ["BLOB"] = null, ["DECIMAL"] = "0.0", ["STRING"] = string.Empty }; /// /// IoTDB 数据类型与.net类型映射 /// /// /// /// private dynamic GetTSDataValue(TSDataType tSDataType, object value) => tSDataType switch { TSDataType.BOOLEAN => Convert.ToBoolean(value), TSDataType.INT32 => Convert.ToInt32(value), TSDataType.INT64 => Convert.ToInt64(value), TSDataType.FLOAT => Convert.ToDouble(value), TSDataType.DOUBLE => Convert.ToDouble(value), TSDataType.TEXT => Convert.ToString(value), TSDataType.NONE => null, TSDataType.TIMESTAMP => Convert.ToInt64(value), TSDataType.DATE => Convert.ToDateTime(value), TSDataType.BLOB => Convert.ToByte(value), TSDataType.STRING => Convert.ToString(value), _ => Convert.ToString(value) }; } }