dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
4 changed files with 128 additions and 11 deletions
Showing only changes of commit 5d0a2088ae - Show all commits

View File

@ -1,12 +1,16 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.IoTDB.DataStructure;
using Apache.IoTDB;
using Confluent.Kafka;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.PrepayModel;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
namespace JiShe.CollectBus.Samples;
@ -14,10 +18,12 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
{
private readonly IIoTDBProvider _iotDBProvider;
private readonly IoTDBOptions _options;
public SampleAppService(IIoTDBProvider iotDBProvider)
public SampleAppService(IIoTDBProvider iotDBProvider, IOptions<IoTDBOptions> options)
{
_iotDBProvider = iotDBProvider;
_options = options.Value;
}
@ -37,6 +43,93 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
await _iotDBProvider.InsertAsync(meter, buildTabletMode);
}
[HttpGet]
public async Task AddReading2Async()
{
var tableSessionPool = new TableSessionPool.Builder()
.SetNodeUrls(_options.ClusterList)
.SetUsername(_options.UserName)
.SetPassword(_options.Password)
.SetFetchSize(1024)
.Build();
await tableSessionPool.Open(false);
await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test1");
await tableSessionPool.ExecuteNonQueryStatementAsync("CREATE DATABASE test2");
//await tableSessionPool.ExecuteNonQueryStatementAsync("use test2");
//// or use full qualified table name
//await tableSessionPool.ExecuteNonQueryStatementAsync(
// "create table test1.table1(region_id STRING TAG, plant_id STRING TAG, device_id STRING TAG, model STRING ATTRIBUTE, temperature FLOAT FIELD, humidity DOUBLE FIELD) with (TTL=3600000)");
//await tableSessionPool.ExecuteNonQueryStatementAsync(
// "create table table2(region_id STRING TAG, plant_id STRING TAG, color STRING ATTRIBUTE, temperature FLOAT FIELD, speed DOUBLE FIELD) with (TTL=6600000)");
//// show tables from current database
//var res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES");
//res.ShowTableNames();
//while (res.HasNext()) Console.WriteLine(res.Next());
//await res.Close();
//// show tables by specifying another database
//// using SHOW tables FROM
//res = await tableSessionPool.ExecuteQueryStatementAsync("SHOW TABLES FROM test1");
//res.ShowTableNames();
//while (res.HasNext()) Console.WriteLine(res.Next());
//await res.Close();
var tableName = "testTable1";
List<string> columnNames =
new List<string> {
"region_id",
"plant_id",
"device_id",
"model",
"temperature",
"humidity" };
List<TSDataType> dataTypes =
new List<TSDataType>{
TSDataType.STRING,
TSDataType.STRING,
TSDataType.STRING,
TSDataType.STRING,
TSDataType.FLOAT,
TSDataType.DOUBLE};
List<ColumnCategory> columnCategories =
new List<ColumnCategory>{
ColumnCategory.TAG,
ColumnCategory.TAG,
ColumnCategory.TAG,
ColumnCategory.ATTRIBUTE,
ColumnCategory.FIELD,
ColumnCategory.FIELD};
var values = new List<List<object>> { };
var timestamps = new List<long> { };
for (long timestamp = 0; timestamp < 100; timestamp++)
{
timestamps.Add(timestamp);
values.Add(new List<object> { "1", "5", "3", "A", 1.23F + timestamp, 111.1 + timestamp
});
}
var tablet = new Tablet(tableName, columnNames, columnCategories, dataTypes, values, timestamps);
await tableSessionPool.InsertAsync(tablet);
//var res = await tableSessionPool.ExecuteQueryStatementAsync("select * from testTable1 "
// + "where region_id = '1' and plant_id in ('3', '5') and device_id = '3'");
// res.ShowTableNames();
// while (res.HasNext()) Console.WriteLine(res.Next());
// await res.Close();
await tableSessionPool.Close();
}
public Task<SampleDto> GetAsync()
{
return Task.FromResult(

View File

@ -77,8 +77,8 @@
"UserName": "root",
"Password": "root",
"ClusterList": [ "192.168.56.102:6667" ],
"PoolSize": 10,
"DataBaseName": "energy",
"PoolSize": 2,
"DataBaseName": "root.energy2",
"OpenDebugMode": true
}
}

View File

@ -27,12 +27,17 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <summary>
/// 密码
/// </summary>
public string Password { get; set; }
public string Password { get; set; }
/// <summary>
/// 连接池大小
/// </summary>
public int PoolSize { get; set; } = 3;
public int PoolSize { get; set; } = 2;
/// <summary>
/// 查询时每次查询的数据量默认1024
/// </summary>
public int FetchSize { get; set; } = 1024;
/// <summary>
/// 是否开启调试模式,生产环境请关闭,因为底层的实现方式,可能会导致内存持续增长。

View File

@ -2,6 +2,7 @@
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
@ -22,18 +23,24 @@ namespace JiShe.CollectBus.IoTDBProvider
private readonly IoTDBOptions _options;
private readonly TableSessionPool _sessionPool;
private static readonly ConcurrentDictionary<Type, DeviceMetadata> _metadataCache = new();
private readonly ILogger<IoTDBProvider> _logger;
private readonly ILogger<IoTDBProvider> _logger;
public IoTDBProvider(IOptions<IoTDBOptions> options, ILogger<IoTDBProvider> logger)
{
_options = options.Value;
_sessionPool = new TableSessionPool.Builder()
#if DEBUG
.SetHost(_options.ClusterList.FirstOrDefault()?.Split(":")[0])
.SetHost(_options.ClusterList.FirstOrDefault()?.Split(":")[1])
#else
.SetNodeUrls(_options.ClusterList)
#endif
.SetUsername(_options.UserName)
.SetPassword(_options.Password)
.SetFetchSize(_options.FetchSize)
.SetPoolSize(_options.PoolSize)
.SetDatabase(_options.DataBaseName)
.SetFetchSize(_options.PoolSize)
.Build();
_sessionPool.Open(false).Wait();
@ -58,10 +65,23 @@ namespace JiShe.CollectBus.IoTDBProvider
var metadata = GetMetadata<T>();
var tablet = BuildTablet(new[] { entity }, metadata, buildTabletMode);
var result = await _sessionPool.InsertAsync(tablet);
if (result <=0)
int result = 0;
if (buildTabletMode == 1)
{
_logger.LogWarning($"{typeof(T).Name}插入数据没有成功");
result = await _sessionPool.InsertAsync(tablet);
}
else if(buildTabletMode == 2)
{
result = await _sessionPool.InsertAsync(tablet);
}
else
{
throw new Exception($"{nameof(InsertAsync)} 表模型{typeof(T).Name}记录入库时buildTabletMode参数值不正确请赋值以后重新处理。");
}
if (result <= 0)
{
_logger.LogError($"{typeof(T).Name}插入数据没有成功");
}
}
@ -169,7 +189,6 @@ namespace JiShe.CollectBus.IoTDBProvider
return new Tablet(
DevicePathBuilder.GetDeviceId(entities.First()),
metadata.ColumnNames,
metadata.ColumnCategories,
metadata.DataTypes,
values,
timestamps