From c57bd15b92819204cb8807e08c4b98c039222fc5 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Wed, 2 Apr 2025 14:06:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=81=E8=A3=85IoTDBProvider?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- JiShe.CollectBus.sln | 7 + .../Attribute/ATTRIBUTEColumnAttribute.cs | 16 ++ .../Attribute/FIELDColumnAttribute.cs | 16 ++ .../Attribute/TAGColumnAttribute.cs | 16 ++ .../CollectBusIoTDBModule.cs | 19 ++ .../DeviceMetadata.cs | 24 ++ .../DevicePathBuilder.cs | 37 +++ .../IIoTDBProvider.cs | 38 +++ .../IoTDBProvider.cs | 262 ++++++++++++++++++ .../IoTEntity.cs | 19 ++ .../JiShe.CollectBus.IoTDBProvider.csproj | 12 + .../Options/IoTDBOptions.cs | 32 +++ .../Options/PagedResult.cs | 25 ++ .../Options/QueryCondition.cs | 27 ++ .../Options/QueryOptions.cs | 27 ++ 15 files changed, 577 insertions(+) create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/IoTEntity.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/JiShe.CollectBus.IoTDBProvider.csproj create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Options/QueryCondition.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index 674eca4..0a7a6e7 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -31,6 +31,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.IoTDBProvider", "src\JiShe.CollectBus.IoTDBProvider\JiShe.CollectBus.IoTDBProvider.csproj", "{A3F3C092-0A25-450B-BF6A-5983163CBEF5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -89,6 +91,10 @@ Global {C06C4082-638F-2996-5FED-7784475766C1}.Debug|Any CPU.Build.0 = Debug|Any CPU {C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.ActiveCfg = Release|Any CPU {C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.Build.0 = Release|Any CPU + {A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A3F3C092-0A25-450B-BF6A-5983163CBEF5}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -107,6 +113,7 @@ Global {8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} + {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs new file mode 100644 index 0000000..42820bc --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// Column分类标记特性(TAG字段) + /// + [AttributeUsage(AttributeTargets.Property)] + public class TAGColumnAttribute : Attribute + { + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs new file mode 100644 index 0000000..2c48a65 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// Column分类标记特性(FIELD字段) + /// + [AttributeUsage(AttributeTargets.Property)] + public class FIELDColumnAttribute : Attribute + { + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs new file mode 100644 index 0000000..28f6a46 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// Column分类标记特性(ATTRIBUTE字段) + /// + [AttributeUsage(AttributeTargets.Property)] + public class ATTRIBUTEColumnAttribute : Attribute + { + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs b/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs new file mode 100644 index 0000000..5a8902c --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs @@ -0,0 +1,19 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Modularity; + +namespace JiShe.CollectBus.IoTDBProvider +{ + public class CollectBusIoTDBModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + context.Services.Configure(context.Services.GetConfiguration().GetSection(nameof(IoTDBOptions))); + context.Services.AddSingleton(); + } + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs b/src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs new file mode 100644 index 0000000..dea65fb --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs @@ -0,0 +1,24 @@ +using Apache.IoTDB; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// 设备元数据 + /// + public class DeviceMetadata + { + public List Measurements { get; } = new(); + public List Tags { get; } = new(); + + public List GetDataTypes() + { + // 根据实际类型映射TSDataType + return Measurements.Select(_ => TSDataType.TEXT).ToList(); + } + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs b/src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs new file mode 100644 index 0000000..ef6d7e4 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// 设备路径构建器 + /// + public static class DevicePathBuilder + { + /// + /// 构建存储组路径 + /// + /// + /// + public static string BuildStorageGroupPath() where T : IoTEntity + { + var type = typeof(T); + return $"root.{type.GetProperty("SystemName")?.Name}.{type.GetProperty("ProjectCode")?.Name}"; + } + + /// + /// 构建设备路径 + /// + /// + /// + /// + public static string BuildDevicePath(T entity) where T : IoTEntity + { + return $"root.{entity.SystemName}.{entity.ProjectCode}.{entity.DeviceId}"; + } + } + +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs new file mode 100644 index 0000000..026a95a --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// IoTDB数据源 + /// + public interface IIoTDBProvider + { + /// + /// 插入数据 + /// + /// + /// + /// + Task InsertAsync(T entity) where T : IoTEntity; + + /// + /// 批量插入数据 + /// + /// + /// + /// + Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity; + + /// + /// 查询数据 + /// + /// + /// + /// + Task> QueryAsync(QueryOptions options) where T : IoTEntity, new(); + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs new file mode 100644 index 0000000..f2a5546 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs @@ -0,0 +1,262 @@ +using Apache.IoTDB; +using Apache.IoTDB.DataStructure; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// IoTDB数据源 + /// + public class IoTDBProvider : IIoTDBProvider, IDisposable + { + private readonly IoTDBOptions _options; + private readonly SessionPool _sessionPool; + private static readonly ConcurrentDictionary _metadataCache = new(); + + public IoTDBProvider(IOptions options) + { + _options = options.Value; + _sessionPool = new SessionPool( + _options.ClusterList, + _options.UserName, + _options.Password, + _options.PoolSize); + _sessionPool.Open(false).Wait(); + } + + /// + /// 获取设备元数据 + /// + /// + /// + private DeviceMetadata GetMetadata() where T : IoTEntity + { + return _metadataCache.GetOrAdd(typeof(T), type => + { + var metadata = new DeviceMetadata(); + foreach (var prop in type.GetProperties()) + { + var attr = prop.GetCustomAttribute(); + if (attr != null) + { + metadata.Tags.Add(prop.Name); + } + else if (prop.Name != nameof(IoTEntity.Timestamp)) + { + metadata.Measurements.Add(prop.Name); + } + } + return metadata; + }); + } + + /// + /// 插入数据 + /// + /// + /// + /// + public async Task InsertAsync(T entity) where T : IoTEntity + { + var metadata = GetMetadata(); + var storageGroup = DevicePathBuilder.BuildStorageGroupPath(); + await EnsureStorageGroupCreated(storageGroup); + + var tablet = BuildTablet(new[] { entity }, metadata); + await _sessionPool.InsertAlignedTabletAsync(tablet); + } + + /// + /// 批量插入数据 + /// + /// + /// + /// + public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity + { + var metadata = GetMetadata(); + var storageGroup = DevicePathBuilder.BuildStorageGroupPath(); + await EnsureStorageGroupCreated(storageGroup); + + var batchSize = 1000; + var batches = entities.Chunk(batchSize); + + foreach (var batch in batches) + { + var tablet = BuildTablet(batch, metadata); + await _sessionPool.InsertAlignedTabletAsync(tablet); + } + } + + /// + /// 构建表模型 + /// + /// + /// + /// + /// + private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity + { + var devicePath = DevicePathBuilder.BuildDevicePath(entities.First()); + var timestamps = new List(); + var values = new List>(); + + foreach (var entity in entities) + { + timestamps.Add(entity.Timestamp); + var rowValues = new List(); + foreach (var measurement in metadata.Measurements) + { + var value = typeof(T).GetProperty(measurement)?.GetValue(entity); + rowValues.Add(value ?? DBNull.Value); + } + values.Add(rowValues); + } + + return new Tablet( + devicePath, + metadata.Measurements, + metadata.GetDataTypes(), + values, + timestamps + ) + { + Tags = metadata.Tags.ToDictionary( + t => t, + t => typeof(T).GetProperty(t)?.GetValue(entities.First())?.ToString()) + }; + } + + /// + /// 查询数据 + /// + /// + /// + /// + public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() + { + var query = BuildQuery(options); + var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query); + + var result = new PagedResult + { + TotalCount = await GetTotalCount(options), + Items = ParseResults(sessionDataSet, options.PageSize) + }; + + return result; + } + + /// + /// 构建查询语句 + /// + /// + /// + /// + private string BuildQuery(QueryOptions options) where T : IoTEntity + { + var metadata = GetMetadata(); + var sb = new StringBuilder("SELECT "); + sb.AppendJoin(", ", metadata.Measurements); + sb.Append($" FROM {DevicePathBuilder.BuildStorageGroupPath()}"); + + if (options.Conditions.Any()) + { + sb.Append(" WHERE "); + sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition)); + } + + sb.Append($" LIMIT {options.PageSize} OFFSET {options.Page * options.PageSize}"); + return sb.ToString(); + } + + /// + /// 将查询条件转换为SQL语句 + /// + /// + /// + /// + private string TranslateCondition(QueryCondition condition) + { + return condition.Operator switch + { + ">" => $"{condition.Field} > {condition.Value}", + "<" => $"{condition.Field} < {condition.Value}", + "=" => $"{condition.Field} = '{condition.Value}'", + _ => throw new NotSupportedException($"Operator {condition.Operator} not supported") + }; + } + + /// + /// 获取查询条件的总数量 + /// + /// + /// + /// + private async Task GetTotalCount(QueryOptions options) where T : IoTEntity + { + var countQuery = $"SELECT COUNT(*) FROM {DevicePathBuilder.BuildStorageGroupPath()}"; + if (options.Conditions.Any()) + { + countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); + } + + var result = await _sessionPool.ExecuteQueryStatementAsync(countQuery); + return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; + } + + /// + /// 解析查询结果 + /// + /// + /// + /// + /// + private IEnumerable ParseResults(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new() + { + var results = new List(); + var metadata = GetMetadata(); + + while (dataSet.HasNext() && results.Count < pageSize) + { + var record = dataSet.Next(); + var entity = new T + { + Timestamp = record.Timestamps + }; + + foreach (var measurement in metadata.Measurements) + { + var value = record.GetValue(measurement); + typeof(T).GetProperty(measurement)?.SetValue(entity, value); + } + + results.Add(entity); + } + return results; + } + + private async Task EnsureStorageGroupCreated(string storageGroup) + { + if (!await _sessionPool.CheckStorageGroupExists(storageGroup)) + { + await _sessionPool.SetStorageGroupAsync(storageGroup); + } + } + + /// + /// 释放资源 + /// + public void Dispose() + { + _sessionPool?.Close().Wait(); + } + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/IoTEntity.cs b/src/JiShe.CollectBus.IoTDBProvider/IoTEntity.cs new file mode 100644 index 0000000..7c277df --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/IoTEntity.cs @@ -0,0 +1,19 @@ +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// IoT实体基类 + /// + public abstract class IoTEntity + { + [TAGColumn] + public string SystemName { get; set; } + + [TAGColumn] + public string ProjectCode { get; set; } + + [TAGColumn] + public string DeviceId { get; set; } + + public long Timestamp { get; set; } + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/JiShe.CollectBus.IoTDBProvider.csproj b/src/JiShe.CollectBus.IoTDBProvider/JiShe.CollectBus.IoTDBProvider.csproj new file mode 100644 index 0000000..37504aa --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/JiShe.CollectBus.IoTDBProvider.csproj @@ -0,0 +1,12 @@ + + + + net8.0 + enable + enable + + + + + + diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs b/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs new file mode 100644 index 0000000..5cdd256 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// IOTDB配置 + /// + public class IoTDBOptions + { + /// + /// 集群列表 + /// + public List ClusterList { get; set; } + /// + /// 用户名 + /// + public string UserName { get; set; } + /// + /// 密码 + /// + public string Password { get; set; } + + /// + /// 连接池大小 + /// + public int PoolSize { get; set; } = 3; + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs b/src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs new file mode 100644 index 0000000..b707355 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Options/PagedResult.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// 查询结果 + /// + /// + public class PagedResult + { + /// + /// 总条数 + /// + public int TotalCount { get; set; } + + /// + /// 数据集合 + /// + public IEnumerable Items { get; set; } + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/QueryCondition.cs b/src/JiShe.CollectBus.IoTDBProvider/Options/QueryCondition.cs new file mode 100644 index 0000000..b1aa53b --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Options/QueryCondition.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// 查询条件 + /// + public class QueryCondition + { + /// + /// 字段 + /// + public string Field { get; set; } + /// + /// 操作符 + /// + public string Operator { get; set; } + /// + /// 值 + /// + public object Value { get; set; } + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs b/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs new file mode 100644 index 0000000..019bd28 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// 查询条件 + /// + public class QueryOptions + { + /// + /// 分页 + /// + public int Page { get; set; } + /// + /// 分页大小 + /// + public int PageSize { get; set; } + /// + /// 查询条件 + /// + public List Conditions { get; } = new(); + } +}