using Apache.IoTDB; using Apache.IoTDB.DataStructure; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace JiShe.CollectBus.IoTDBProvider { /// /// IoTDB数据源 /// public class IoTDBProvider : IIoTDBProvider, IDisposable { private readonly IoTDBOptions _options; private readonly TableSessionPool _sessionPool; private static readonly ConcurrentDictionary _metadataCache = new(); private readonly ILogger _logger; public IoTDBProvider(IOptions options, ILogger logger) { _options = options.Value; _sessionPool = new TableSessionPool.Builder() .SetNodeUrls(_options.ClusterList) .SetUsername(_options.UserName) .SetPassword(_options.Password) .SetDatabase(_options.DataBaseName) .SetFetchSize(_options.PoolSize) .Build(); _sessionPool.Open(false).Wait(); if (_options.OpenDebugMode) { _sessionPool.OpenDebugMode(builder => { builder.AddConsole(); }); } _logger = logger; } /// /// 插入数据 /// /// /// /// public async Task InsertAsync(T entity, int buildTabletMode) where T : IoTEntity { var metadata = GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata, buildTabletMode); var result = await _sessionPool.InsertAsync(tablet); if (result <=0) { _logger.LogWarning($"{typeof(T).Name}插入数据没有成功"); } } /// /// 批量插入数据 /// /// /// /// public async Task BatchInsertAsync(IEnumerable entities, int buildTabletMode) where T : IoTEntity { var metadata = GetMetadata(); var batchSize = 1000; var batches = entities.Chunk(batchSize); foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata, buildTabletMode); var result = await _sessionPool.InsertAsync(tablet); if (result <= 0) { _logger.LogWarning($"{typeof(T).Name} 批量插入数据第{batch}批次没有成功,共{batches}批次。"); } } } /// /// 删除数据 /// /// /// /// public async Task DeleteAsync(QueryOptions options) where T : IoTEntity { var query = BuildDeleteSQL(options); var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query); if (!sessionDataSet.HasNext()) { _logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。"); return 0; } //获取唯一结果行 var row = sessionDataSet.Next(); return row.Values[0]; } /// /// 查询数据 /// /// /// /// public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { var query = BuildQuerySQL(options); var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query); var result = new PagedResult { TotalCount = await GetTotalCount(options), Items = ParseResults(sessionDataSet, options.PageSize) }; return result; } /// /// 构建表模型 /// /// /// 表实体 /// 设备元数据 /// 构建表模型方式,1 根据实体《T》直接显示指定Tag,2根据实体《T》的名称指定表名 /// private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata, int buildTabletMode) where T : IoTEntity { var timestamps = new List(); var values = new List>(); foreach (var entity in entities) { timestamps.Add(entity.Timestamps); var rowValues = new List(); foreach (var measurement in metadata.ColumnNames) { var value = typeof(T).GetProperty(measurement)?.GetValue(entity); if (value == null) { throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空,不符合IoTDB设计标准,请赋值以后重新处理。"); } rowValues.Add(value); } values.Add(rowValues); } if (buildTabletMode == 1) { return new Tablet( DevicePathBuilder.GetDeviceId(entities.First()), metadata.ColumnNames, metadata.ColumnCategories, metadata.DataTypes, values, timestamps ); } else if (buildTabletMode == 2) { return new Tablet( DevicePathBuilder.GetTableName(), metadata.ColumnNames, metadata.ColumnCategories, metadata.DataTypes, values, timestamps ); } else { throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,buildTabletMode参数值不正确,请赋值以后重新处理。"); } } /// /// 构建查询语句 /// /// /// /// private string BuildQuerySQL(QueryOptions options) where T : IoTEntity { var metadata = GetMetadata(); var sb = new StringBuilder("SELECT "); sb.AppendJoin(", ", metadata.ColumnNames); sb.Append($" FROM {options.TableNameOrTagName}"); if (options.Conditions.Any()) { sb.Append(" WHERE "); sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); } sb.Append($" LIMIT {options.PageSize} OFFSET {options.Page * options.PageSize}"); return sb.ToString(); } /// /// 构建删除语句 /// /// /// /// private string BuildDeleteSQL(QueryOptions options) where T : IoTEntity { var metadata = GetMetadata(); var sb = new StringBuilder("DELETE "); sb.Append($" FROM {options.TableNameOrTagName}"); sb.AppendJoin(", ", metadata.ColumnNames); if (options.Conditions.Any()) { sb.Append(" WHERE "); sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); } return sb.ToString(); } /// /// 将查询条件转换为SQL语句 /// /// /// /// private string TranslateCondition(QueryCondition condition) { return condition.Operator switch { ">" => $"{condition.Field} > {condition.Value}", "<" => $"{condition.Field} < {condition.Value}", "=" => $"{condition.Field} = '{condition.Value}'", _ => throw new NotSupportedException($"Operator {condition.Operator} not supported") }; } /// /// 获取查询条件的总数量 /// /// /// /// private async Task GetTotalCount(QueryOptions options) where T : IoTEntity { var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTagName}"; if (options.Conditions.Any()) { countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); } var result = await _sessionPool.ExecuteQueryStatementAsync(countQuery); return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; } /// /// 解析查询结果 /// /// /// /// /// private IEnumerable ParseResults(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() { var results = new List(); var metadata = GetMetadata(); var properties = typeof(T).GetProperties(); while (dataSet.HasNext() && results.Count < pageSize) { var record = dataSet.Next(); var entity = new T { Timestamps = record.Timestamps }; foreach (var measurement in metadata.ColumnNames) { var value = record.Values; var prop = properties.FirstOrDefault(p => p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); if (prop != null) { typeof(T).GetProperty(measurement)?.SetValue(entity, value); } } results.Add(entity); } return results; } /// /// 释放资源 /// public void Dispose() { _sessionPool?.Close().Wait(); } /// /// 获取设备元数据 /// /// /// private DeviceMetadata GetMetadata() where T : IoTEntity { return _metadataCache.GetOrAdd(typeof(T), type => { var columns = CollectColumnMetadata(type); var metadata = BuildDeviceMetadata(columns); return metadata; }); } /// /// 获取设备元数据的列 /// /// /// private List CollectColumnMetadata(Type type) { var columns = new List(); foreach (var prop in type.GetProperties()) { //按优先级顺序检查属性,避免重复反射 ColumnInfo? column = prop.GetCustomAttribute() is not null ? new ColumnInfo( name: prop.Name, //使用属性名 category: ColumnCategory.TAG, dataType: GetDataTypeFromTypeName(prop.PropertyType.Name) ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(prop.PropertyType.Name) ) : prop.GetCustomAttribute() is not null ? new ColumnInfo( prop.Name, ColumnCategory.FIELD, GetDataTypeFromTypeName(prop.PropertyType.Name) ) : null; if (column.HasValue) { columns.Add(column.Value); } } return columns; } /// /// 构建设备元数据 /// /// /// private DeviceMetadata BuildDeviceMetadata(List columns) { var metadata = new DeviceMetadata(); //按业务逻辑顺序处理(TAG -> FIELD -> ATTRIBUTE) var groupedColumns = columns .GroupBy(c => c.Category) .ToDictionary(g => g.Key, g => g.ToList()); ProcessCategory(groupedColumns, ColumnCategory.TAG, metadata); ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata); ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata); return metadata; } /// /// 处理不同列类型的逻辑 /// /// /// /// private void ProcessCategory(IReadOnlyDictionary> groupedColumns, ColumnCategory category, DeviceMetadata metadata) { if (groupedColumns.TryGetValue(category, out var cols)) { metadata.ColumnNames.AddRange(cols.Select(c => c.Name)); metadata.ColumnCategories.AddRange(cols.Select(c => c.Category)); metadata.DataTypes.AddRange(cols.Select(c => c.DataType)); } } /// /// 数据列结构 /// private readonly struct ColumnInfo { public string Name { get; } public ColumnCategory Category { get; } public TSDataType DataType { get; } public ColumnInfo(string name, ColumnCategory category, TSDataType dataType) { Name = name; Category = category; DataType = dataType; } } /// /// 根据类型名称获取对应的 IoTDB 数据类型 /// /// 类型名称(不区分大小写) /// 对应的 TSDataType,默认返回 TSDataType.STRING private TSDataType GetDataTypeFromTypeName(string typeName) { if (string.IsNullOrWhiteSpace(typeName)) return TSDataType.STRING; return DataTypeMap.TryGetValue(typeName.Trim(), out var dataType) ? dataType : TSDataType.STRING; } /// /// 根据类型名称获取 IoTDB 数据类型 /// private readonly IReadOnlyDictionary DataTypeMap = new Dictionary(StringComparer.OrdinalIgnoreCase) { ["BOOLEAN"] = TSDataType.BOOLEAN, ["INT32"] = TSDataType.INT32, ["INT64"] = TSDataType.INT64, ["FLOAT"] = TSDataType.FLOAT, ["DOUBLE"] = TSDataType.DOUBLE, ["TEXT"] = TSDataType.TEXT, ["NULLTYPE"] = TSDataType.NONE, ["TIMESTAMP"] = TSDataType.TIMESTAMP, ["DATE"] = TSDataType.DATE, ["BLOB"] = TSDataType.BLOB, ["STRING"] = TSDataType.STRING }; } }