using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
{
///
/// IoTDB数据源
///
public class IoTDBProvider : IIoTDBProvider, IDisposable
{
private readonly IoTDBOptions _options;
private readonly TableSessionPool _sessionPool;
private static readonly ConcurrentDictionary _metadataCache = new();
private readonly ILogger _logger;
public IoTDBProvider(IOptions options, ILogger logger)
{
_options = options.Value;
_sessionPool = new TableSessionPool.Builder()
.SetNodeUrls(_options.ClusterList)
.SetUsername(_options.UserName)
.SetPassword(_options.Password)
.SetFetchSize(_options.PoolSize)
.Build();
_sessionPool.Open(false).Wait();
_logger = logger;
}
///
/// 插入数据
///
///
///
///
public async Task InsertAsync(T entity, int buildTabletMode) where T : IoTEntity
{
var metadata = GetMetadata();
var tablet = BuildTablet(new[] { entity }, metadata, buildTabletMode);
await _sessionPool.InsertAsync(tablet);
}
///
/// 批量插入数据
///
///
///
///
public async Task BatchInsertAsync(IEnumerable entities, int buildTabletMode) where T : IoTEntity
{
var metadata = GetMetadata();
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, metadata, buildTabletMode);
await _sessionPool.InsertAsync(tablet);
}
}
///
/// 删除数据
///
///
///
///
public async Task