using Apache.IoTDB; using Apache.IoTDB.DataStructure; using Microsoft.Extensions.Options; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace JiShe.CollectBus.IoTDBProvider { /// /// IoTDB数据源 /// public class IoTDBProvider : IIoTDBProvider, IDisposable { private readonly IoTDBOptions _options; private readonly TableSessionPool _sessionPool; private static readonly ConcurrentDictionary _metadataCache = new(); public IoTDBProvider(IOptions options) { _options = options.Value; _sessionPool = new TableSessionPool.Builder() .SetNodeUrls(_options.ClusterList) .SetUsername(_options.UserName) .SetPassword(_options.Password) .SetFetchSize(_options.PoolSize) .Build(); _sessionPool.Open(false).Wait(); } /// /// 获取设备元数据 /// /// /// private DeviceMetadata GetMetadata() where T : IoTEntity { return _metadataCache.GetOrAdd(typeof(T), type => { var metadata = new DeviceMetadata(); foreach (var prop in type.GetProperties()) { //标签列 var attrTAG = prop.GetCustomAttribute(); if (attrTAG != null) { metadata.ColumnCategories.Add(ColumnCategory.TAG); } //属性列 var attrATTRIBUTE = prop.GetCustomAttribute(); if (attrATTRIBUTE != null) { metadata.ColumnCategories.Add(ColumnCategory.ATTRIBUTE); } //数据列 var attrFIELD = prop.GetCustomAttribute(); if (attrFIELD != null) { metadata.ColumnCategories.Add(ColumnCategory.FIELD); metadata.Measurements.Add(prop.Name); metadata.DataTypes.Add(GetDataTypeFromStr(prop.PropertyType.Name)); } } return metadata; }); } /// /// 插入数据 /// /// /// /// public async Task InsertAsync(T entity) where T : IoTEntity { var metadata = GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata); await _sessionPool.InsertAsync(tablet); } /// /// 批量插入数据 /// /// /// /// public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity { var metadata = GetMetadata(); var batchSize = 1000; var batches = entities.Chunk(batchSize); foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); await _sessionPool.InsertAsync(tablet); } } /// /// 构建表模型 /// /// /// /// /// private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity { var deviceId = DevicePathBuilder.GetDeviceId(entities.First()); var timestamps = new List(); var values = new List>(); foreach (var entity in entities) { timestamps.Add(entity.Timestamps); var rowValues = new List(); foreach (var measurement in metadata.Measurements) { var value = typeof(T).GetProperty(measurement)?.GetValue(entity); if(value == null) { throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空,不符合IoTDB设计标准,请赋值以后重新处理。"); } rowValues.Add(value); } values.Add(rowValues); } return new Tablet( deviceId, metadata.Measurements, metadata.DataTypes, values, timestamps ); } /// /// 查询数据 /// /// /// /// public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { var query = BuildQuery(options); var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query); var result = new PagedResult { TotalCount = await GetTotalCount(options), Items = ParseResults(sessionDataSet, options.PageSize) }; return result; } /// /// 构建查询语句 /// /// /// /// private string BuildQuery(QueryOptions options) where T : IoTEntity { var metadata = GetMetadata(); var sb = new StringBuilder("SELECT "); sb.AppendJoin(", ", metadata.Measurements); sb.Append($" FROM {DevicePathBuilder.GetTableName()}"); if (options.Conditions.Any()) { sb.Append(" WHERE "); sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); } sb.Append($" LIMIT {options.PageSize} OFFSET {options.Page * options.PageSize}"); return sb.ToString(); } /// /// 将查询条件转换为SQL语句 /// /// /// /// private string TranslateCondition(QueryCondition condition) { return condition.Operator switch { ">" => $"{condition.Field} > {condition.Value}", "<" => $"{condition.Field} < {condition.Value}", "=" => $"{condition.Field} = '{condition.Value}'", _ => throw new NotSupportedException($"Operator {condition.Operator} not supported") }; } /// /// 获取查询条件的总数量 /// /// /// /// private async Task GetTotalCount(QueryOptions options) where T : IoTEntity { var countQuery = $"SELECT COUNT(*) FROM {DevicePathBuilder.GetTableName()}"; if (options.Conditions.Any()) { countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); } var result = await _sessionPool.ExecuteQueryStatementAsync(countQuery); return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; } /// /// 解析查询结果 /// /// /// /// /// private IEnumerable ParseResults(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() { var results = new List(); var metadata = GetMetadata(); var properties = typeof(T).GetProperties(); while (dataSet.HasNext() && results.Count < pageSize) { var record = dataSet.Next(); var entity = new T { Timestamps = record.Timestamps }; foreach (var measurement in metadata.Measurements) { var value = record.Values; var prop = properties.FirstOrDefault(p => p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); if (prop != null) { typeof(T).GetProperty(measurement)?.SetValue(entity, value); } } results.Add(entity); } return results; } /// /// 释放资源 /// public void Dispose() { _sessionPool?.Close().Wait(); } private TSDataType GetDataTypeFromStr(string str) { return str switch { "BOOLEAN" => TSDataType.BOOLEAN, "INT32" => TSDataType.INT32, "INT64" => TSDataType.INT64, "FLOAT" => TSDataType.FLOAT, "DOUBLE" => TSDataType.DOUBLE, "TEXT" => TSDataType.TEXT, "NULLTYPE" => TSDataType.NONE, "TIMESTAMP" => TSDataType.TIMESTAMP, "DATE" => TSDataType.DATE, "BLOB" => TSDataType.BLOB, "STRING" => TSDataType.STRING, _ => TSDataType.STRING }; } } }