diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 4b62555..9b853fd 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -1,12 +1,16 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using Apache.IoTDB.DataStructure; +using Apache.IoTDB; +using Confluent.Kafka; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IotSystems.PrepayModel; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Options; namespace JiShe.CollectBus.Samples; @@ -14,10 +18,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService { private readonly IIoTDBProvider _iotDBProvider; + private readonly IoTDBOptions _options; - public SampleAppService(IIoTDBProvider iotDBProvider) + public SampleAppService(IIoTDBProvider iotDBProvider, IOptions options) { _iotDBProvider = iotDBProvider; + _options = options.Value; } @@ -37,6 +43,93 @@ public class SampleAppService : CollectBusAppService, ISampleAppService await _iotDBProvider.InsertAsync(meter, buildTabletMode); } + + [HttpGet] + public async Task AddReading2Async() + { + var tableSessionPool = new TableSessionPool.Builder() + .SetNodeUrls(_options.ClusterList) + .SetUsername(_options.UserName) + .SetPassword(_options.Password) + .SetFetchSize(1024) + .Build(); + + await tableSessionPool.Open(false); + + + + await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test1"); + await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test2"); + + //await tableSessionPool.ExecuteNonQueryStatementAsync("use test2"); + + //// or use full qualified table name + //await tableSessionPool.ExecuteNonQueryStatementAsync( + // "create table test1.table1(region_id STRING TAG, plant_id STRING TAG, device_id STRING TAG, model STRING ATTRIBUTE, temperature FLOAT FIELD, humidity DOUBLE FIELD) with (TTL=3600000)"); + + //await tableSessionPool.ExecuteNonQueryStatementAsync( + // "create table table2(region_id STRING TAG, plant_id STRING TAG, color STRING ATTRIBUTE, temperature FLOAT FIELD, speed DOUBLE FIELD) with (TTL=6600000)"); + + //// show tables from current database + //var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES"); + //res.ShowTableNames(); + //while (res.HasNext()) Console.WriteLine(res.Next()); + //await res.Close(); + + //// show tables by specifying another database + //// using SHOW tables FROM + //res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES FROM test1"); + //res.ShowTableNames(); + //while (res.HasNext()) Console.WriteLine(res.Next()); + //await res.Close(); + + var tableName = "testTable1"; + List columnNames = + new List { + "region_id", + "plant_id", + "device_id", + "model", + "temperature", + "humidity" }; + List dataTypes = + new List{ + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.DOUBLE}; + List columnCategories = + new List{ + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.ATTRIBUTE, + ColumnCategory.FIELD, + ColumnCategory.FIELD}; + var values = new List> { }; + var timestamps = new List { }; + for (long timestamp = 0; timestamp < 100; timestamp++) + { + timestamps.Add(timestamp); + values.Add(new List { "1", "5", "3", "A", 1.23F + timestamp, 111.1 + timestamp +}); + } + var tablet = new Tablet(tableName, columnNames, columnCategories, dataTypes, values, timestamps); + await tableSessionPool.InsertAsync(tablet); + + + //var res = await tableSessionPool.ExecuteQueryStatementAsync("select * from testTable1 " + // + "where region_id = '1' and plant_id in ('3', '5') and device_id = '3'"); + // res.ShowTableNames(); + // while (res.HasNext()) Console.WriteLine(res.Next()); + // await res.Close(); + + await tableSessionPool.Close(); + } + + public Task GetAsync() { return Task.FromResult( diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 05c99de..83eeb5b 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -77,8 +77,8 @@ "UserName": "root", "Password": "root", "ClusterList": [ "192.168.56.102:6667" ], - "PoolSize": 10, - "DataBaseName": "energy", + "PoolSize": 2, + "DataBaseName": "root.energy2", "OpenDebugMode": true } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs b/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs index 6a762b1..e9be4b4 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Options/IoTDBOptions.cs @@ -27,12 +27,17 @@ namespace JiShe.CollectBus.IoTDBProvider /// /// 密码 /// - public string Password { get; set; } + public string Password { get; set; } /// /// 连接池大小 /// - public int PoolSize { get; set; } = 3; + public int PoolSize { get; set; } = 2; + + /// + /// 查询时,每次查询的数据量,默认1024 + /// + public int FetchSize { get; set; } = 1024; /// /// 是否开启调试模式,生产环境请关闭,因为底层的实现方式,可能会导致内存持续增长。 diff --git a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs index b911876..eb83aca 100644 --- a/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs +++ b/src/JiShe.CollectBus.IoTDBProvider/Provider/IoTDBProvider.cs @@ -2,6 +2,7 @@ using Apache.IoTDB.DataStructure; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; @@ -22,18 +23,24 @@ namespace JiShe.CollectBus.IoTDBProvider private readonly IoTDBOptions _options; private readonly TableSessionPool _sessionPool; private static readonly ConcurrentDictionary _metadataCache = new(); - private readonly ILogger _logger; + private readonly ILogger _logger; public IoTDBProvider(IOptions options, ILogger logger) { _options = options.Value; _sessionPool = new TableSessionPool.Builder() +#if DEBUG + .SetHost(_options.ClusterList.FirstOrDefault()?.Split(":")[0]) + .SetHost(_options.ClusterList.FirstOrDefault()?.Split(":")[1]) +#else .SetNodeUrls(_options.ClusterList) +#endif .SetUsername(_options.UserName) .SetPassword(_options.Password) + .SetFetchSize(_options.FetchSize) + .SetPoolSize(_options.PoolSize) .SetDatabase(_options.DataBaseName) - .SetFetchSize(_options.PoolSize) .Build(); _sessionPool.Open(false).Wait(); @@ -58,10 +65,23 @@ namespace JiShe.CollectBus.IoTDBProvider var metadata = GetMetadata(); var tablet = BuildTablet(new[] { entity }, metadata, buildTabletMode); - var result = await _sessionPool.InsertAsync(tablet); - if (result <=0) + int result = 0; + if (buildTabletMode == 1) { - _logger.LogWarning($"{typeof(T).Name}插入数据没有成功"); + result = await _sessionPool.InsertAsync(tablet); + } + else if(buildTabletMode == 2) + { + result = await _sessionPool.InsertAsync(tablet); + } + else + { + throw new Exception($"{nameof(InsertAsync)} 表模型{typeof(T).Name}记录入库时,buildTabletMode参数值不正确,请赋值以后重新处理。"); + } + + if (result <= 0) + { + _logger.LogError($"{typeof(T).Name}插入数据没有成功"); } } @@ -169,7 +189,6 @@ namespace JiShe.CollectBus.IoTDBProvider return new Tablet( DevicePathBuilder.GetDeviceId(entities.First()), metadata.ColumnNames, - metadata.ColumnCategories, metadata.DataTypes, values, timestamps