diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 4801404..7083387 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -57,7 +57,6 @@ namespace JiShe.CollectBus.Plugins if (aFn.HasValue && fn.HasValue && aTuple != null && !string.IsNullOrWhiteSpace(aTuple.Item1)) { var tcpSessionClient = (ITcpSessionClient)client; - if ((AFN)aFn == AFN.链路接口检测) { diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs index 42820bc..1780bf9 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/ATTRIBUTEColumnAttribute.cs @@ -5,12 +5,12 @@ using System.Text; using System.Threading.Tasks; namespace JiShe.CollectBus.IoTDBProvider -{ +{ /// - /// Column分类标记特性(TAG字段) + /// Column分类标记特性(ATTRIBUTE字段),也就是属性字段 /// [AttributeUsage(AttributeTargets.Property)] - public class TAGColumnAttribute : Attribute + public class ATTRIBUTEColumnAttribute : Attribute { } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs index 2c48a65..2f8470b 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/FIELDColumnAttribute.cs @@ -7,7 +7,7 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.IoTDBProvider { /// - /// Column分类标记特性(FIELD字段) + /// Column分类标记特性(FIELD字段),数据列字段 /// [AttributeUsage(AttributeTargets.Property)] public class FIELDColumnAttribute : Attribute diff --git a/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs b/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs index 28f6a46..b6149fe 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Attribute/TAGColumnAttribute.cs @@ -7,10 +7,10 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.IoTDBProvider { /// - /// Column分类标记特性(ATTRIBUTE字段) + /// Column分类标记特性(TAG字段),标签字段 /// [AttributeUsage(AttributeTargets.Property)] - public class ATTRIBUTEColumnAttribute : Attribute + public class TAGColumnAttribute : Attribute { } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs b/src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs deleted file mode 100644 index dea65fb..0000000 --- a/src/JiShe.CollectBus.IoTDBProvider/DeviceMetadata.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Apache.IoTDB; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.IoTDBProvider -{ - /// - /// 设备元数据 - /// - public class DeviceMetadata - { - public List Measurements { get; } = new(); - public List Tags { get; } = new(); - - public List GetDataTypes() - { - // 根据实际类型映射TSDataType - return Measurements.Select(_ => TSDataType.TEXT).ToList(); - } - } -} diff --git a/src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs similarity index 100% rename from src/JiShe.CollectBus.IoTDBProvider/IIoTDBProvider.cs rename to src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs diff --git a/src/JiShe.CollectBus.IoTDBProvider/IoTEntity.cs b/src/JiShe.CollectBus.IoTDBProvider/IoTEntity.cs deleted file mode 100644 index 7c277df..0000000 --- a/src/JiShe.CollectBus.IoTDBProvider/IoTEntity.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace JiShe.CollectBus.IoTDBProvider -{ - /// - /// IoT实体基类 - /// - public abstract class IoTEntity - { - [TAGColumn] - public string SystemName { get; set; } - - [TAGColumn] - public string ProjectCode { get; set; } - - [TAGColumn] - public string DeviceId { get; set; } - - public long Timestamp { get; set; } - } -} diff --git a/src/JiShe.CollectBus.IoTDBProvider/JiShe.CollectBus.IoTDBProvider.csproj b/src/JiShe.CollectBus.IoTDBProvider/JiShe.CollectBus.IoTDBProvider.csproj index 37504aa..717b0f6 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/JiShe.CollectBus.IoTDBProvider.csproj +++ b/src/JiShe.CollectBus.IoTDBProvider/JiShe.CollectBus.IoTDBProvider.csproj @@ -6,7 +6,8 @@ enable - + + diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs new file mode 100644 index 0000000..8e84776 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/DeviceMetadata.cs @@ -0,0 +1,30 @@ +using Apache.IoTDB; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// 设备元数据 + /// + public class DeviceMetadata + { + /// + /// 测量值集合,用于构建Table的测量值,也就是field参数 + /// + public List Measurements { get; } = new(); + + /// + /// 列类型集合,用于构建Table的列类型,也就是columnCategory参数 + /// + public List ColumnCategories { get; } = new(); + + /// + /// 值类型集合,用于构建Table的值类型,也就是dataType参数 + /// + public ListDataTypes { get; } = new(); + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/DevicePathBuilder.cs similarity index 72% rename from src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs rename to src/JiShe.CollectBus.IoTDBProvider/Provider/DevicePathBuilder.cs index ef6d7e4..3166ca6 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/DevicePathBuilder.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/DevicePathBuilder.cs @@ -11,27 +11,29 @@ namespace JiShe.CollectBus.IoTDBProvider /// public static class DevicePathBuilder { - /// - /// 构建存储组路径 - /// - /// - /// - public static string BuildStorageGroupPath() where T : IoTEntity - { - var type = typeof(T); - return $"root.{type.GetProperty("SystemName")?.Name}.{type.GetProperty("ProjectCode")?.Name}"; - } - /// /// 构建设备路径 /// /// /// /// - public static string BuildDevicePath(T entity) where T : IoTEntity + public static string GetDeviceId(T entity) where T : IoTEntity { return $"root.{entity.SystemName}.{entity.ProjectCode}.{entity.DeviceId}"; } + + + /// + /// 获取表名称 + /// + /// + /// + /// + public static string GetTableName() where T : IoTEntity + { + var type = typeof(T); + return $"{type.Name}"; + } } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs similarity index 70% rename from src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs rename to src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs index f2a5546..449884a 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/IoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs @@ -17,17 +17,20 @@ namespace JiShe.CollectBus.IoTDBProvider public class IoTDBProvider : IIoTDBProvider, IDisposable { private readonly IoTDBOptions _options; - private readonly SessionPool _sessionPool; + private readonly TableSessionPool _sessionPool; private static readonly ConcurrentDictionary _metadataCache = new(); public IoTDBProvider(IOptions options) { _options = options.Value; - _sessionPool = new SessionPool( - _options.ClusterList, - _options.UserName, - _options.Password, - _options.PoolSize); + + _sessionPool = new TableSessionPool.Builder() + .SetNodeUrls(_options.ClusterList) + .SetUsername(_options.UserName) + .SetPassword(_options.Password) + .SetFetchSize(_options.PoolSize) + .Build(); + _sessionPool.Open(false).Wait(); } @@ -43,14 +46,27 @@ namespace JiShe.CollectBus.IoTDBProvider var metadata = new DeviceMetadata(); foreach (var prop in type.GetProperties()) { - var attr = prop.GetCustomAttribute(); - if (attr != null) + //标签列 + var attrTAG = prop.GetCustomAttribute(); + if (attrTAG != null) { - metadata.Tags.Add(prop.Name); + metadata.ColumnCategories.Add(ColumnCategory.TAG); } - else if (prop.Name != nameof(IoTEntity.Timestamp)) + + //属性列 + var attrATTRIBUTE = prop.GetCustomAttribute(); + if (attrATTRIBUTE != null) { + metadata.ColumnCategories.Add(ColumnCategory.ATTRIBUTE); + } + + //数据列 + var attrFIELD = prop.GetCustomAttribute(); + if (attrFIELD != null) + { + metadata.ColumnCategories.Add(ColumnCategory.FIELD); metadata.Measurements.Add(prop.Name); + metadata.DataTypes.Add(GetDataTypeFromStr(prop.PropertyType.Name)); } } return metadata; @@ -66,11 +82,9 @@ namespace JiShe.CollectBus.IoTDBProvider public async Task InsertAsync(T entity) where T : IoTEntity { var metadata = GetMetadata(); - var storageGroup = DevicePathBuilder.BuildStorageGroupPath(); - await EnsureStorageGroupCreated(storageGroup); var tablet = BuildTablet(new[] { entity }, metadata); - await _sessionPool.InsertAlignedTabletAsync(tablet); + await _sessionPool.InsertAsync(tablet); } /// @@ -81,9 +95,7 @@ namespace JiShe.CollectBus.IoTDBProvider /// public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity { - var metadata = GetMetadata(); - var storageGroup = DevicePathBuilder.BuildStorageGroupPath(); - await EnsureStorageGroupCreated(storageGroup); + var metadata = GetMetadata(); var batchSize = 1000; var batches = entities.Chunk(batchSize); @@ -91,7 +103,7 @@ namespace JiShe.CollectBus.IoTDBProvider foreach (var batch in batches) { var tablet = BuildTablet(batch, metadata); - await _sessionPool.InsertAlignedTabletAsync(tablet); + await _sessionPool.InsertAsync(tablet); } } @@ -104,34 +116,33 @@ namespace JiShe.CollectBus.IoTDBProvider /// private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity { - var devicePath = DevicePathBuilder.BuildDevicePath(entities.First()); + var deviceId = DevicePathBuilder.GetDeviceId(entities.First()); var timestamps = new List(); var values = new List>(); foreach (var entity in entities) { - timestamps.Add(entity.Timestamp); + timestamps.Add(entity.Timestamps); var rowValues = new List(); foreach (var measurement in metadata.Measurements) { var value = typeof(T).GetProperty(measurement)?.GetValue(entity); - rowValues.Add(value ?? DBNull.Value); + if(value == null) + { + throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空,不符合IoTDB设计标准,请赋值以后重新处理。"); + } + rowValues.Add(value); } values.Add(rowValues); } return new Tablet( - devicePath, + deviceId, metadata.Measurements, - metadata.GetDataTypes(), + metadata.DataTypes, values, timestamps - ) - { - Tags = metadata.Tags.ToDictionary( - t => t, - t => typeof(T).GetProperty(t)?.GetValue(entities.First())?.ToString()) - }; + ); } /// @@ -165,7 +176,7 @@ namespace JiShe.CollectBus.IoTDBProvider var metadata = GetMetadata(); var sb = new StringBuilder("SELECT "); sb.AppendJoin(", ", metadata.Measurements); - sb.Append($" FROM {DevicePathBuilder.BuildStorageGroupPath()}"); + sb.Append($" FROM {DevicePathBuilder.GetTableName()}"); if (options.Conditions.Any()) { @@ -202,7 +213,7 @@ namespace JiShe.CollectBus.IoTDBProvider /// private async Task GetTotalCount(QueryOptions options) where T : IoTEntity { - var countQuery = $"SELECT COUNT(*) FROM {DevicePathBuilder.BuildStorageGroupPath()}"; + var countQuery = $"SELECT COUNT(*) FROM {DevicePathBuilder.GetTableName()}"; if (options.Conditions.Any()) { countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); @@ -224,33 +235,35 @@ namespace JiShe.CollectBus.IoTDBProvider var results = new List(); var metadata = GetMetadata(); + var properties = typeof(T).GetProperties(); + while (dataSet.HasNext() && results.Count < pageSize) { var record = dataSet.Next(); var entity = new T { - Timestamp = record.Timestamps + Timestamps = record.Timestamps }; + foreach (var measurement in metadata.Measurements) { - var value = record.GetValue(measurement); - typeof(T).GetProperty(measurement)?.SetValue(entity, value); + var value = record.Values; + + var prop = properties.FirstOrDefault(p => + p.Name.Equals(measurement, StringComparison.OrdinalIgnoreCase)); + if (prop != null) + { + typeof(T).GetProperty(measurement)?.SetValue(entity, value); + } + } results.Add(entity); } return results; } - - private async Task EnsureStorageGroupCreated(string storageGroup) - { - if (!await _sessionPool.CheckStorageGroupExists(storageGroup)) - { - await _sessionPool.SetStorageGroupAsync(storageGroup); - } - } - + /// /// 释放资源 /// @@ -258,5 +271,24 @@ namespace JiShe.CollectBus.IoTDBProvider { _sessionPool?.Close().Wait(); } + + private TSDataType GetDataTypeFromStr(string str) + { + return str switch + { + "BOOLEAN" => TSDataType.BOOLEAN, + "INT32" => TSDataType.INT32, + "INT64" => TSDataType.INT64, + "FLOAT" => TSDataType.FLOAT, + "DOUBLE" => TSDataType.DOUBLE, + "TEXT" => TSDataType.TEXT, + "NULLTYPE" => TSDataType.NONE, + "TIMESTAMP" => TSDataType.TIMESTAMP, + "DATE" => TSDataType.DATE, + "BLOB" => TSDataType.BLOB, + "STRING" => TSDataType.STRING, + _ => TSDataType.STRING + }; + } } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs new file mode 100644 index 0000000..2b090de --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs @@ -0,0 +1,37 @@ +namespace JiShe.CollectBus.IoTDBProvider +{ + /// + /// IoT实体基类 + /// + public abstract class IoTEntity + { + /// + /// 系统名称 + /// + [TAGColumn] + public string SystemName { get; set; } + + /// + /// 项目编码 + /// + [TAGColumn] + public string ProjectCode { get; set; } + + /// + /// 设备类型集中器、电表、水表、流量计、传感器等 + /// + [TAGColumn] + public string DeviceType { get; set; } + + /// + /// 设备ID + /// + [TAGColumn] + public string DeviceId { get; set; } + + /// + /// 时间戳 + /// + public long Timestamps { get; set; } + } +}