using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IoTDBProvider.Context;
using JiShe.CollectBus.IoTDBProvider.Interface;
using JiShe.CollectBus.IoTDBProvider.Provider;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
{
///
/// IoTDB数据源
///
public class IoTDBProvider : IIoTDBProvider
{
private IIoTDBSessionPool _currentSession;
private static readonly ConcurrentDictionary _metadataCache = new();
private readonly ILogger _logger;
private readonly IIoTDBSessionFactory _sessionFactory;
private readonly IoTDBRuntimeContext _runtimeContext;
public IoTDBProvider(
ILogger logger,
IIoTDBSessionFactory sessionFactory,
IoTDBRuntimeContext runtimeContext)
{
_logger = logger;
_sessionFactory = sessionFactory;
_runtimeContext = runtimeContext;
RefreshSessionPool();
}
private void RefreshSessionPool()
{
try
{
_currentSession?.Dispose();
_currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
_currentSession.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (Exception ex)
{
//自动回退到备用SessionPool
_logger.LogError($"{nameof(RefreshSessionPool)} 切换SessionPool失败,回退到默认配置:{ex.Serialize()}");
_runtimeContext.UseTableSessionPool = !_runtimeContext.UseTableSessionPool;
_currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
}
}
///
/// 切换 SessionPool
///
///
public void SwitchSessionPool(bool useTableSession)
{
if (_runtimeContext.UseTableSessionPool != useTableSession)
{
_runtimeContext.UseTableSessionPool = useTableSession;
RefreshSessionPool();
}
}
///
/// 插入数据
///
///
///
///
public async Task InsertAsync(T entity) where T : IoTEntity
{
var metadata = GetMetadata();
var tablet = BuildTablet(new[] { entity }, metadata);
int result = await _currentSession.InsertAsync(tablet);
if (result <= 0)
{
_logger.LogError($"{typeof(T).Name}插入数据没有成功");
}
}
///
/// 批量插入数据
///
///
///
public async Task BatchInsertAsync(IEnumerable entities) where T : IoTEntity
{
var metadata = GetMetadata();
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, metadata);
var result = await _currentSession.InsertAsync(tablet);
if (result <= 0)
{
_logger.LogWarning($"{typeof(T).Name} 批量插入数据第{batch}批次没有成功,共{batches}批次。");
}
}
}
///
/// 删除数据
///
///
///
///
public async Task