using Apache.IoTDB; using Apache.IoTDB.DataStructure; using Microsoft.Extensions.Options; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace JiShe.CollectBus.IoTDBProvider { /// /// IoTDB数据源 /// public class IoTDBProvider : IIoTDBProvider, IDisposable { private readonly IoTDBOptions _options; private readonly SessionPool _sessionPool; private static readonly ConcurrentDictionary _metadataCache = new(); public IoTDBProvider(IOptions options) { _options = options.Value; _sessionPool = new SessionPool( _options.ClusterList, _options.UserName, _options.Password, _options.PoolSize); _sessionPool.Open(false).Wait(); } /// /// 获取设备元数据 /// /// /// private DeviceMetadata GetMetadata() where T : IoTEntity { return _metadataCache.GetOrAdd(typeof(T), type => { var metadata = new DeviceMetadata(); foreach (var prop in type.GetProperties()) { var attr = prop.GetCustomAttribute(); if (attr != null) { metadata.Tags.Add(prop.Name); } else if (prop.Name != nameof(IoTEntity.Timestamp)) { metadata.Measurements.Add(prop.Name); } } return metadata; }); } /// /// 插入数据 /// /// /// /// public async Task InsertAsync(T entity) where T : IoTEntity { var metadata = GetMetadata(); var storageGroup = DevicePathBuilder.BuildStorageGroupPath(); await EnsureStorageGroupCreated(storageGroup); var tablet = BuildTablet(new[] { entity }, metadata); await _sessionPool.InsertAlignedTabletAsync(tablet); } /// /// 批量插入数据 /// /// /// /// public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity { var metadata = GetMetadata(); var storageGroup = DevicePathBuilder.BuildStorageGroupPath(); await EnsureStorageGroupCreated(storageGroup); var batchSize = 1000; var batches = entities.Chunk(batchSize); foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); await _sessionPool.InsertAlignedTabletAsync(tablet); } } /// /// 构建表模型 /// /// /// /// /// private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity { var devicePath = DevicePathBuilder.BuildDevicePath(entities.First()); var timestamps = new List(); var values = new List>(); foreach (var entity in entities) { timestamps.Add(entity.Timestamp); var rowValues = new List(); foreach (var measurement in metadata.Measurements) { var value = typeof(T).GetProperty(measurement)?.GetValue(entity); rowValues.Add(value ?? DBNull.Value); } values.Add(rowValues); } return new Tablet( devicePath, metadata.Measurements, metadata.GetDataTypes(), values, timestamps ) { Tags = metadata.Tags.ToDictionary( t => t, t => typeof(T).GetProperty(t)?.GetValue(entities.First())?.ToString()) }; } /// /// 查询数据 /// /// /// /// public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { var query = BuildQuery(options); var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query); var result = new PagedResult { TotalCount = await GetTotalCount(options), Items = ParseResults(sessionDataSet, options.PageSize) }; return result; } /// /// 构建查询语句 /// /// /// /// private string BuildQuery(QueryOptions options) where T : IoTEntity { var metadata = GetMetadata(); var sb = new StringBuilder("SELECT "); sb.AppendJoin(", ", metadata.Measurements); sb.Append($" FROM {DevicePathBuilder.BuildStorageGroupPath()}"); if (options.Conditions.Any()) { sb.Append(" WHERE "); sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); } sb.Append($" LIMIT {options.PageSize} OFFSET {options.Page * options.PageSize}"); return sb.ToString(); } /// /// 将查询条件转换为SQL语句 /// /// /// /// private string TranslateCondition(QueryCondition condition) { return condition.Operator switch { ">" => $"{condition.Field} > {condition.Value}", "<" => $"{condition.Field} < {condition.Value}", "=" => $"{condition.Field} = '{condition.Value}'", _ => throw new NotSupportedException($"Operator {condition.Operator} not supported") }; } /// /// 获取查询条件的总数量 /// /// /// /// private async Task GetTotalCount(QueryOptions options) where T : IoTEntity { var countQuery = $"SELECT COUNT(*) FROM {DevicePathBuilder.BuildStorageGroupPath()}"; if (options.Conditions.Any()) { countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); } var result = await _sessionPool.ExecuteQueryStatementAsync(countQuery); return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; } /// /// 解析查询结果 /// /// /// /// /// private IEnumerable ParseResults(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() { var results = new List(); var metadata = GetMetadata(); while (dataSet.HasNext() && results.Count < pageSize) { var record = dataSet.Next(); var entity = new T { Timestamp = record.Timestamps }; foreach (var measurement in metadata.Measurements) { var value = record.GetValue(measurement); typeof(T).GetProperty(measurement)?.SetValue(entity, value); } results.Add(entity); } return results; } private async Task EnsureStorageGroupCreated(string storageGroup) { if (!await _sessionPool.CheckStorageGroupExists(storageGroup)) { await _sessionPool.SetStorageGroupAsync(storageGroup); } } /// /// 释放资源 /// public void Dispose() { _sessionPool?.Close().Wait(); } } }