From 7696b7379be5510416b7fe2f065e5a82cc234c54 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 7 Apr 2025 16:44:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0SessionPool=E7=81=B5=E6=B4=BB?= =?UTF-8?q?=E5=88=87=E6=8D=A2=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Samples/SampleAppService.cs | 106 ++------ src/JiShe.CollectBus.Host/appsettings.json | 7 +- .../CollectBusIoTDBModule.cs | 16 +- .../Context/IoTDBRuntimeContext.cs | 36 +++ .../Interface/IIoTDBProvider.cs | 14 +- .../Interface/IIoTDBSessionFactory.cs | 16 ++ .../Interface/IIoTDBSessionPool.cs | 35 +++ .../Options/IoTDBOptions.cs | 7 +- .../Options/QueryOptions.cs | 6 +- .../Provider/DevicePathBuilder.cs | 2 +- .../Provider/IoTDBProvider.cs | 231 ++++++++++-------- .../Provider/IoTDBSessionFactory.cs | 36 +++ .../Provider/IoTEntity.cs | 4 +- .../Provider/SessionPoolAdapter.cs | 74 ++++++ .../Provider/TableSessionPoolAdapter.cs | 72 ++++++ 15 files changed, 464 insertions(+), 198 deletions(-) create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Context/IoTDBRuntimeContext.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionPool.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Provider/SessionPoolAdapter.cs create mode 100644 src/JiShe.CollectBus.IoTDBProvider/Provider/TableSessionPoolAdapter.cs diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 9b853fd..8990319 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -11,6 +11,7 @@ using JiShe.CollectBus.IotSystems.PrepayModel; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Options; +using JiShe.CollectBus.IoTDBProvider.Context; namespace JiShe.CollectBus.Samples; @@ -18,17 +19,20 @@ public class SampleAppService : CollectBusAppService, ISampleAppService { private readonly IIoTDBProvider _iotDBProvider; + private readonly IoTDBRuntimeContext _dbContext; private readonly IoTDBOptions _options; - public SampleAppService(IIoTDBProvider iotDBProvider, IOptions options) + public SampleAppService(IIoTDBProvider iotDBProvider, IOptions options, + IoTDBRuntimeContext dbContext) { _iotDBProvider = iotDBProvider; _options = options.Value; + _dbContext = dbContext; } [HttpGet] - public async Task AddReadingAsync(int buildTabletMode = 1) + public async Task UseSessionPool() { ElectricityMeter meter = new ElectricityMeter() { @@ -38,95 +42,31 @@ public class SampleAppService : CollectBusAppService, ISampleAppService Current = 10, MeterModel = "DDZY-1980", ProjectCode = "10059", - Voltage = 10 + Voltage = 10, + Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; - await _iotDBProvider.InsertAsync(meter, buildTabletMode); + await _iotDBProvider.InsertAsync(meter); } [HttpGet] - public async Task AddReading2Async() + public async Task UseTableSessionPool() { - var tableSessionPool = new TableSessionPool.Builder() - .SetNodeUrls(_options.ClusterList) - .SetUsername(_options.UserName) - .SetPassword(_options.Password) - .SetFetchSize(1024) - .Build(); + //_dbContext.UseTableSessionPool = true; + _iotDBProvider.SwitchSessionPool(true); - await tableSessionPool.Open(false); - - - - await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test1"); - await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test2"); - - //await tableSessionPool.ExecuteNonQueryStatementAsync("use test2"); - - //// or use full qualified table name - //await tableSessionPool.ExecuteNonQueryStatementAsync( - // "create table test1.table1(region_id STRING TAG, plant_id STRING TAG, device_id STRING TAG, model STRING ATTRIBUTE, temperature FLOAT FIELD, humidity DOUBLE FIELD) with (TTL=3600000)"); - - //await tableSessionPool.ExecuteNonQueryStatementAsync( - // "create table table2(region_id STRING TAG, plant_id STRING TAG, color STRING ATTRIBUTE, temperature FLOAT FIELD, speed DOUBLE FIELD) with (TTL=6600000)"); - - //// show tables from current database - //var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES"); - //res.ShowTableNames(); - //while (res.HasNext()) Console.WriteLine(res.Next()); - //await res.Close(); - - //// show tables by specifying another database - //// using SHOW tables FROM - //res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES FROM test1"); - //res.ShowTableNames(); - //while (res.HasNext()) Console.WriteLine(res.Next()); - //await res.Close(); - - var tableName = "testTable1"; - List columnNames = - new List { - "region_id", - "plant_id", - "device_id", - "model", - "temperature", - "humidity" }; - List dataTypes = - new List{ - TSDataType.STRING, - TSDataType.STRING, - TSDataType.STRING, - TSDataType.STRING, - TSDataType.FLOAT, - TSDataType.DOUBLE}; - List columnCategories = - new List{ - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.TAG, - ColumnCategory.ATTRIBUTE, - ColumnCategory.FIELD, - ColumnCategory.FIELD}; - var values = new List> { }; - var timestamps = new List { }; - for (long timestamp = 0; timestamp < 100; timestamp++) + ElectricityMeter meter = new ElectricityMeter() { - timestamps.Add(timestamp); - values.Add(new List { "1", "5", "3", "A", 1.23F + timestamp, 111.1 + timestamp -}); - } - var tablet = new Tablet(tableName, columnNames, columnCategories, dataTypes, values, timestamps); - await tableSessionPool.InsertAsync(tablet); - - - //var res = await tableSessionPool.ExecuteQueryStatementAsync("select * from testTable1 " - // + "where region_id = '1' and plant_id in ('3', '5') and device_id = '3'"); - // res.ShowTableNames(); - // while (res.HasNext()) Console.WriteLine(res.Next()); - // await res.Close(); - - await tableSessionPool.Close(); + SystemName = "energy", + DeviceId = "402440506", + DeviceType = "Ammeter", + Current = 10, + MeterModel = "DDZY-1980", + ProjectCode = "10059", + Voltage = 10, + Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + await _iotDBProvider.InsertAsync(meter); } diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 83eeb5b..3d51171 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -40,7 +40,7 @@ "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" }, "Redis": { - "Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", + "Configuration": "118.190.144.92:6380,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "DefaultDB": "14", "HangfireDB": "15" }, @@ -78,7 +78,8 @@ "Password": "root", "ClusterList": [ "192.168.56.102:6667" ], "PoolSize": 2, - "DataBaseName": "root.energy2", - "OpenDebugMode": true + "DataBaseName": "energy", + "OpenDebugMode": true, + "UseTableSessionPoolByDefault": false } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs b/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs index 5a8902c..444ab4e 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/CollectBusIoTDBModule.cs @@ -1,4 +1,7 @@ -using Microsoft.Extensions.DependencyInjection; +using JiShe.CollectBus.IoTDBProvider.Context; +using JiShe.CollectBus.IoTDBProvider.Interface; +using JiShe.CollectBus.IoTDBProvider.Provider; +using Microsoft.Extensions.DependencyInjection; using System; using System.Collections.Generic; using System.Linq; @@ -13,7 +16,16 @@ namespace JiShe.CollectBus.IoTDBProvider public override void ConfigureServices(ServiceConfigurationContext context) { context.Services.Configure(context.Services.GetConfiguration().GetSection(nameof(IoTDBOptions))); - context.Services.AddSingleton(); + + // 注册上下文为Scoped + context.Services.AddScoped(); + + // 注册Session工厂 + context.Services.AddSingleton(); + + // 注册Provider + context.Services.AddScoped(); + } } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Context/IoTDBRuntimeContext.cs b/src/JiShe.CollectBus.IoTDBProvider/Context/IoTDBRuntimeContext.cs new file mode 100644 index 0000000..41f48cb --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Context/IoTDBRuntimeContext.cs @@ -0,0 +1,36 @@ +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider.Context +{ + /// + /// IoTDB SessionPool 运行时上下文 + /// + public class IoTDBRuntimeContext + { + private readonly bool _defaultValue; + + public IoTDBRuntimeContext(IOptions options) + { + _defaultValue = options.Value.UseTableSessionPoolByDefault; + UseTableSessionPool = _defaultValue; + } + + /// + /// 是否使用表模型存储, 默认false,使用tree模型存储 + /// + public bool UseTableSessionPool { get; set; } + + /// + /// 重置为默认值 + /// + public void ResetToDefault() + { + UseTableSessionPool = _defaultValue; + } + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs index 71cc741..b67f17d 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBProvider.cs @@ -7,27 +7,31 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.IoTDBProvider { /// - /// IoTDB数据源 + /// IoTDB数据源,数据库能同时存多个时序模型,但数据是完全隔离的,不能跨时序模型查询,通过连接字符串配置 /// public interface IIoTDBProvider { + /// + /// 切换 SessionPool + /// + /// 是否使用表模型 + void SwitchSessionPool(bool useTableSession); + /// /// 插入数据 /// /// /// - /// 构建表模型方式,1 根据实体《T》直接显示指定Tag,2根据实体《T》的名称指定表名 /// - Task InsertAsync(T entity, int buildTabletMode) where T : IoTEntity; + Task InsertAsync(T entity) where T : IoTEntity; /// /// 批量插入数据 /// /// /// - /// 构建表模型方式,1 根据实体《T》直接显示指定Tag,2根据实体《T》的名称指定表名 /// - Task BatchInsertAsync(IEnumerable entities, int buildTabletMode) where T : IoTEntity; + Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity; /// diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs new file mode 100644 index 0000000..b904ba0 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionFactory.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider.Interface +{ + /// + /// Session 工厂接口 + /// + public interface IIoTDBSessionFactory + { + IIoTDBSessionPool GetSessionPool(bool useTableSession); + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionPool.cs b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionPool.cs new file mode 100644 index 0000000..be2c2b7 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Interface/IIoTDBSessionPool.cs @@ -0,0 +1,35 @@ +using Apache.IoTDB.DataStructure; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider.Interface +{ + /// + /// Session 连接池 + /// + public interface IIoTDBSessionPool : IDisposable + { + /// + /// 打开连接池 + /// + /// + Task OpenAsync(); + + /// + /// 插入数据 + /// + /// + /// + Task InsertAsync(Tablet tablet); + + /// + /// 查询数据 + /// + /// + /// + Task ExecuteQueryStatementAsync(string sql); + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs b/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs index e9be4b4..62cbd92 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs @@ -12,7 +12,7 @@ namespace JiShe.CollectBus.IoTDBProvider public class IoTDBOptions { /// - /// 数据库名称 + /// 数据库名称,表模型才有,树模型为空 /// public string DataBaseName { get; set; } @@ -43,5 +43,10 @@ namespace JiShe.CollectBus.IoTDBProvider /// 是否开启调试模式,生产环境请关闭,因为底层的实现方式,可能会导致内存持续增长。 /// public bool OpenDebugMode { get; set;} + + /// + /// 是否使用表模型存储, 默认false,使用tree模型存储 + /// + public bool UseTableSessionPoolByDefault { get; set; } = false; } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs b/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs index d04cd48..3600f12 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Options/QueryOptions.cs @@ -12,18 +12,20 @@ namespace JiShe.CollectBus.IoTDBProvider public class QueryOptions { /// - /// 表名或标签名 + /// 表模型的表名称或者树模型的设备路径 /// - public required string TableNameOrTagName { get; set; } + public required string TableNameOrTreePath { get; set; } /// /// 分页 /// public int Page { get; set; } + /// /// 分页大小 /// public int PageSize { get; set; } + /// /// 查询条件 /// diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/DevicePathBuilder.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/DevicePathBuilder.cs index cfab5c2..e170577 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/DevicePathBuilder.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/DevicePathBuilder.cs @@ -17,7 +17,7 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// /// - public static string GetDeviceId(T entity) where T : IoTEntity + public static string GetDevicePath(T entity) where T : IoTEntity { return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectCode}`.`{entity.DeviceId}`"; } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs index eb83aca..4046ba9 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs @@ -2,6 +2,9 @@ using Apache.IoTDB.DataStructure; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.IoTDBProvider.Context; +using JiShe.CollectBus.IoTDBProvider.Interface; +using JiShe.CollectBus.IoTDBProvider.Provider; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -18,40 +21,54 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// IoTDB数据源 /// - public class IoTDBProvider : IIoTDBProvider, IDisposable + public class IoTDBProvider : IIoTDBProvider { - private readonly IoTDBOptions _options; - private readonly TableSessionPool _sessionPool; + private IIoTDBSessionPool _currentSession; private static readonly ConcurrentDictionary _metadataCache = new(); - private readonly ILogger _logger; + private readonly ILogger _logger; + private readonly IIoTDBSessionFactory _sessionFactory; + private readonly IoTDBRuntimeContext _runtimeContext; - public IoTDBProvider(IOptions options, ILogger logger) + public IoTDBProvider( + ILogger logger, + IIoTDBSessionFactory sessionFactory, + IoTDBRuntimeContext runtimeContext) { - _options = options.Value; - - _sessionPool = new TableSessionPool.Builder() -#if DEBUG - .SetHost(_options.ClusterList.FirstOrDefault()?.Split(":")[0]) - .SetHost(_options.ClusterList.FirstOrDefault()?.Split(":")[1]) -#else - .SetNodeUrls(_options.ClusterList) -#endif - .SetUsername(_options.UserName) - .SetPassword(_options.Password) - .SetFetchSize(_options.FetchSize) - .SetPoolSize(_options.PoolSize) - .SetDatabase(_options.DataBaseName) - .Build(); - - _sessionPool.Open(false).Wait(); - if (_options.OpenDebugMode) - { - _sessionPool.OpenDebugMode(builder => - { - builder.AddConsole(); - }); - } _logger = logger; + _sessionFactory = sessionFactory; + _runtimeContext = runtimeContext; + + RefreshSessionPool(); + } + + private void RefreshSessionPool() + { + try + { + _currentSession?.Dispose(); + _currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); + _currentSession.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + } + catch (Exception ex) + { + //自动回退到备用SessionPool + _logger.LogError($"{nameof(RefreshSessionPool)} 切换SessionPool失败,回退到默认配置:{ex.Serialize()}"); + _runtimeContext.UseTableSessionPool = !_runtimeContext.UseTableSessionPool; + _currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool); + } + } + + /// + /// 切换 SessionPool + /// + /// + public void SwitchSessionPool(bool useTableSession) + { + if (_runtimeContext.UseTableSessionPool != useTableSession) + { + _runtimeContext.UseTableSessionPool = useTableSession; + RefreshSessionPool(); + } } /// @@ -60,24 +77,13 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// /// - public async Task InsertAsync(T entity, int buildTabletMode) where T : IoTEntity + public async Task InsertAsync(T entity) where T : IoTEntity { var metadata = GetMetadata(); - var tablet = BuildTablet(new[] { entity }, metadata, buildTabletMode); - int result = 0; - if (buildTabletMode == 1) - { - result = await _sessionPool.InsertAsync(tablet); - } - else if(buildTabletMode == 2) - { - result = await _sessionPool.InsertAsync(tablet); - } - else - { - throw new Exception($"{nameof(InsertAsync)} 表模型{typeof(T).Name}记录入库时,buildTabletMode参数值不正确,请赋值以后重新处理。"); - } + var tablet = BuildTablet(new[] { entity }, metadata); + + int result = await _currentSession.InsertAsync(tablet); if (result <= 0) { @@ -89,9 +95,8 @@ namespace JiShe.CollectBus.IoTDBProvider /// 批量插入数据 /// /// - /// /// - public async Task BatchInsertAsync(IEnumerable entities, int buildTabletMode) where T : IoTEntity + public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity { var metadata = GetMetadata(); @@ -100,8 +105,8 @@ namespace JiShe.CollectBus.IoTDBProvider foreach (var batch in batches) { - var tablet = BuildTablet(batch, metadata, buildTabletMode); - var result = await _sessionPool.InsertAsync(tablet); + var tablet = BuildTablet(batch, metadata); + var result = await _currentSession.InsertAsync(tablet); if (result <= 0) { _logger.LogWarning($"{typeof(T).Name} 批量插入数据第{batch}批次没有成功,共{batches}批次。"); @@ -119,7 +124,7 @@ namespace JiShe.CollectBus.IoTDBProvider public async Task DeleteAsync(QueryOptions options) where T : IoTEntity { var query = BuildDeleteSQL(options); - var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query); + var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query); if (!sessionDataSet.HasNext()) { @@ -132,9 +137,6 @@ namespace JiShe.CollectBus.IoTDBProvider return row.Values[0]; } - - - /// /// 查询数据 /// @@ -144,7 +146,7 @@ namespace JiShe.CollectBus.IoTDBProvider public async Task> QueryAsync(QueryOptions options) where T : IoTEntity, new() { var query = BuildQuerySQL(options); - var sessionDataSet = await _sessionPool.ExecuteQueryStatementAsync(query); + var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query); var result = new PagedResult { @@ -156,17 +158,17 @@ namespace JiShe.CollectBus.IoTDBProvider } /// - /// 构建表模型 + /// 构建Tablet /// /// /// 表实体 - /// 设备元数据 - /// 构建表模型方式,1 根据实体《T》直接显示指定Tag,2根据实体《T》的名称指定表名 + /// 设备元数据 /// - private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata, int buildTabletMode) where T : IoTEntity + private Tablet BuildTablet(IEnumerable entities, DeviceMetadata metadata) where T : IoTEntity { var timestamps = new List(); var values = new List>(); + var devicePaths = new HashSet(); foreach (var entity in entities) { @@ -175,40 +177,70 @@ namespace JiShe.CollectBus.IoTDBProvider foreach (var measurement in metadata.ColumnNames) { var value = typeof(T).GetProperty(measurement)?.GetValue(entity); - if (value == null) - { - throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空,不符合IoTDB设计标准,请赋值以后重新处理。"); - } - rowValues.Add(value); + rowValues.Add(value ?? new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,属性{measurement}值为空,不符合IoTDB设计标准,请赋值以后重新处理。")); } values.Add(rowValues); + if (!_runtimeContext.UseTableSessionPool)//树模型 + { + devicePaths.Add(DevicePathBuilder.GetDevicePath(entity)); + } + else + { + devicePaths.Add(DevicePathBuilder.GetTableName()); + } } - if (buildTabletMode == 1) + if (devicePaths.Count > 1) { - return new Tablet( - DevicePathBuilder.GetDeviceId(entities.First()), - metadata.ColumnNames, - metadata.DataTypes, - values, - timestamps - ); - } - else if (buildTabletMode == 2) - { - return new Tablet( - DevicePathBuilder.GetTableName(), - metadata.ColumnNames, - metadata.ColumnCategories, - metadata.DataTypes, - values, - timestamps - ); - } - else - { - throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,buildTabletMode参数值不正确,请赋值以后重新处理。"); + throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。"); } + + return _runtimeContext.UseTableSessionPool + ? BuildTableSessionTablet(metadata, devicePaths.First(), values, timestamps) + : BuildSessionTablet(metadata, devicePaths.First(), values, timestamps); + } + + /// + /// 构建tree模型的Tablet + /// + /// + /// + /// + /// + /// + private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, + List> values, List timestamps) + { + return new Tablet( + devicePath, + metadata.ColumnNames, + metadata.DataTypes, + values, + timestamps + ); + } + + /// + /// 构建表模型的Tablet + /// + /// + /// + /// + /// + /// + private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string devicePath, + List> values, List timestamps) + { + var tablet = new Tablet( + devicePath, + metadata.ColumnNames, + metadata.ColumnCategories, + metadata.DataTypes, + values, + timestamps + ); + + return tablet; } /// @@ -222,7 +254,7 @@ namespace JiShe.CollectBus.IoTDBProvider var metadata = GetMetadata(); var sb = new StringBuilder("SELECT "); sb.AppendJoin(", ", metadata.ColumnNames); - sb.Append($" FROM {options.TableNameOrTagName}"); + sb.Append($" FROM {options.TableNameOrTreePath}"); if (options.Conditions.Any()) { @@ -243,9 +275,18 @@ namespace JiShe.CollectBus.IoTDBProvider private string BuildDeleteSQL(QueryOptions options) where T : IoTEntity { var metadata = GetMetadata(); - var sb = new StringBuilder("DELETE "); + var sb = new StringBuilder(); - sb.Append($" FROM {options.TableNameOrTagName}"); + if (!_runtimeContext.UseTableSessionPool) + { + sb.Append("DELETE "); + } + else + { + sb.Append("DROP "); + } + + sb.Append($" FROM {options.TableNameOrTreePath}"); sb.AppendJoin(", ", metadata.ColumnNames); @@ -283,13 +324,13 @@ namespace JiShe.CollectBus.IoTDBProvider /// private async Task GetTotalCount(QueryOptions options) where T : IoTEntity { - var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTagName}"; + var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}"; if (options.Conditions.Any()) { countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition)); } - var result = await _sessionPool.ExecuteQueryStatementAsync(countQuery); + var result = await _currentSession.ExecuteQueryStatementAsync(countQuery); return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0; } @@ -334,14 +375,6 @@ namespace JiShe.CollectBus.IoTDBProvider return results; } - /// - /// 释放资源 - /// - public void Dispose() - { - _sessionPool?.Close().Wait(); - } - /// /// 获取设备元数据 /// @@ -453,7 +486,7 @@ namespace JiShe.CollectBus.IoTDBProvider private TSDataType GetDataTypeFromTypeName(string typeName) { if (string.IsNullOrWhiteSpace(typeName)) - return TSDataType.STRING; + return TSDataType.STRING; return DataTypeMap.TryGetValue(typeName.Trim(), out var dataType) ? dataType diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs new file mode 100644 index 0000000..a9b462a --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBSessionFactory.cs @@ -0,0 +1,36 @@ +using JiShe.CollectBus.IoTDBProvider.Interface; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.IoTDBProvider.Provider +{ + + /// + /// 实现带缓存的Session工厂 + /// + public class IoTDBSessionFactory : IIoTDBSessionFactory + { + private readonly IoTDBOptions _options; + private readonly ConcurrentDictionary _pools = new(); + + public IoTDBSessionFactory(IOptions options) + { + _options = options.Value; + } + + public IIoTDBSessionPool GetSessionPool(bool useTableSession) + { + return _pools.GetOrAdd(useTableSession, key => + { + return key + ? new TableSessionPoolAdapter(_options) + : new SessionPoolAdapter(_options); + }); + } + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs index 45964fa..d9bf4fa 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTEntity.cs @@ -30,8 +30,8 @@ public string DeviceId { get; set; } /// - /// 当前时间戳,单位秒 + /// 当前时间戳,单位毫秒 /// - public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeSeconds(); + public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } } diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/SessionPoolAdapter.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/SessionPoolAdapter.cs new file mode 100644 index 0000000..81a69a9 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/SessionPoolAdapter.cs @@ -0,0 +1,74 @@ +using Apache.IoTDB.DataStructure; +using Apache.IoTDB; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using JiShe.CollectBus.IoTDBProvider.Interface; +using Microsoft.Extensions.Logging; + +namespace JiShe.CollectBus.IoTDBProvider.Provider +{ + /// + /// 树模型连接池 + /// + public class SessionPoolAdapter : IIoTDBSessionPool + { + private readonly SessionPool _sessionPool; + private readonly IoTDBOptions _options; + + public SessionPoolAdapter(IoTDBOptions options) + { + _options = options; + _sessionPool = new SessionPool.Builder() + .SetNodeUrl(options.ClusterList) + .SetUsername(options.UserName) + .SetPassword(options.Password) + .SetFetchSize(options.FetchSize) + .SetPoolSize(options.PoolSize) + .Build(); + } + + /// + /// 打开连接池 + /// + /// + public async Task OpenAsync() + { + await _sessionPool.Open(false); + if (_options.OpenDebugMode) + { + _sessionPool.OpenDebugMode(builder => + { + builder.AddConsole(); + }); + } + } + + /// + /// 批量插入对齐时间序列数据 + /// + /// + /// + public async Task InsertAsync(Tablet tablet) + { + return await _sessionPool.InsertAlignedTabletAsync(tablet); + } + + /// + /// 查询数据 + /// + /// + /// + public async Task ExecuteQueryStatementAsync(string sql) + { + return await _sessionPool.ExecuteQueryStatementAsync(sql); + } + + public void Dispose() + { + _sessionPool?.Close().ConfigureAwait(false).GetAwaiter().GetResult(); + } + } +} diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/TableSessionPoolAdapter.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/TableSessionPoolAdapter.cs new file mode 100644 index 0000000..3676199 --- /dev/null +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/TableSessionPoolAdapter.cs @@ -0,0 +1,72 @@ +using Apache.IoTDB.DataStructure; +using Apache.IoTDB; +using JiShe.CollectBus.IoTDBProvider.Interface; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace JiShe.CollectBus.IoTDBProvider.Provider +{ + /// + /// 表模型Session连接池 + /// + public class TableSessionPoolAdapter : IIoTDBSessionPool + { + private readonly TableSessionPool _sessionPool; + private readonly IoTDBOptions _options; + + public TableSessionPoolAdapter(IoTDBOptions options) + { + _options = options; + _sessionPool = new TableSessionPool.Builder() + .SetNodeUrls(options.ClusterList) + .SetUsername(options.UserName) + .SetPassword(options.Password) + .SetFetchSize(options.FetchSize) + .SetPoolSize(options.PoolSize) + .SetDatabase(options.DataBaseName) + .Build(); + } + + /// + /// 打开连接池 + /// + /// + public async Task OpenAsync() + { + await _sessionPool.Open(false); + if (_options.OpenDebugMode) + { + _sessionPool.OpenDebugMode(builder => builder.AddConsole()); + } + } + + /// + /// 批量插入 + /// + /// + /// + public async Task InsertAsync(Tablet tablet) + { + return await _sessionPool.InsertAsync(tablet); + } + + /// + /// 查询数据 + /// + /// + /// + public async Task ExecuteQueryStatementAsync(string sql) + { + return await _sessionPool.ExecuteQueryStatementAsync(sql); + } + + public void Dispose() + { + _sessionPool?.Close().ConfigureAwait(false).GetAwaiter().GetResult(); + } + } +}