优化IoTDB驱动

This commit is contained in:
ChenYi 2025-04-11 11:56:23 +08:00
parent b5f929910c
commit 2beca87bb2
9 changed files with 157 additions and 86 deletions

View File

@ -14,6 +14,7 @@ using Microsoft.Extensions.Options;
using JiShe.CollectBus.IoTDBProvider.Context;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IotSystems.AFNEntity;
namespace JiShe.CollectBus.Samples;
@ -67,8 +68,21 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
[HttpGet]
public async Task UseTableSessionPool()
{
//_dbContext.UseTableSessionPool = true;
_iotDBProvider.SwitchSessionPool(true);
ElectricityMeter meter2 = new ElectricityMeter()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
Current = 10,
MeterModel = "DDZY-1980",
ProjectCode = "10059",
Voltage = 10,
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
await _iotDBProvider.InsertAsync(meter2);
_dbContext.UseTableSessionPool = true;
ElectricityMeter meter = new ElectricityMeter()
{
@ -108,6 +122,27 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
}
/// <summary>
/// 测试单个测点数据项
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestSingleMeasuringAFNData(string measuring, object value)
{
SingleMeasuringAFNDataEntity meter = new SingleMeasuringAFNDataEntity()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectCode = "10059",
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
SingleMeasuring = new Tuple<string, object>(measuring, value)
};
await _iotDBProvider.InsertAsync(meter);
}
public Task<SampleDto> GetAsync()
{
return Task.FromResult(

View File

@ -764,7 +764,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}";
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}";
Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>();
foreach (var subItem in item)
{
@ -781,7 +781,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
NextTask = DateTime.Now.AddMinutes(1)
};
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, itemTimeDensity.Key);
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");

View File

@ -10,8 +10,12 @@ namespace JiShe.CollectBus.IotSystems.AFNEntity
/// <summary>
/// AFN单项数据实体
/// </summary>
public class AFNDataEntity:IoTEntity
public class SingleMeasuringAFNDataEntity:IoTEntity
{
public string ItemCode { get; set; }
/// <summary>
/// 单项数据对象
/// </summary>
[SingleMeasuring(nameof(SingleMeasuring))]
public Tuple<string, object> SingleMeasuring { get; set; }
}
}

View File

@ -7,10 +7,16 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDBProvider
{
/// <summary>
/// 用于标识当前实体为单个测点单侧点标识字段类型是Dictionary<string,object>
/// 用于标识当前实体为单侧点模式单侧点模式只有一个Filed标识字段,类型是Tuple<string,object>,Item1=>测点名称Item2=>测点值
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
public class SingleMeasuringAttribute : Attribute
{
public string FieldName { get; set;}
public SingleMeasuringAttribute(string fieldName)
{
FieldName = fieldName;
}
}
}

View File

@ -11,11 +11,11 @@ namespace JiShe.CollectBus.IoTDBProvider
/// </summary>
public interface IIoTDBProvider
{
/// <summary>
/// 切换 SessionPool
/// </summary>
/// <param name="useTableSession">是否使用表模型</param>
void SwitchSessionPool(bool useTableSession);
///// <summary>
///// 切换 SessionPool
///// </summary>
///// <param name="useTableSession">是否使用表模型</param>
//void SwitchSessionPool(bool useTableSession);
/// <summary>
/// 插入数据

View File

@ -9,7 +9,7 @@ namespace JiShe.CollectBus.IoTDBProvider.Interface
/// <summary>
/// Session 工厂接口
/// </summary>
public interface IIoTDBSessionFactory
public interface IIoTDBSessionFactory:IDisposable
{
IIoTDBSessionPool GetSessionPool(bool useTableSession);
}

View File

@ -12,6 +12,11 @@ namespace JiShe.CollectBus.IoTDBProvider
/// </summary>
public class DeviceMetadata
{
/// <summary>
/// 是否有单测量值
/// </summary>
public bool IsSingleMeasuring { get; set; }
/// <summary>
/// 测量值集合用于构建Table的测量值也就是columnNames参数
/// </summary>

View File

@ -24,12 +24,14 @@ namespace JiShe.CollectBus.IoTDBProvider
/// </summary>
public class IoTDBProvider : IIoTDBProvider
{
private IIoTDBSessionPool _currentSession;
private static readonly ConcurrentDictionary<Type, DeviceMetadata> _metadataCache = new();
private readonly ILogger<IoTDBProvider> _logger;
private readonly IIoTDBSessionFactory _sessionFactory;
private readonly IoTDBRuntimeContext _runtimeContext;
private IIoTDBSessionPool CurrentSession =>
_sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
public IoTDBProvider(
ILogger<IoTDBProvider> logger,
IIoTDBSessionFactory sessionFactory,
@ -39,37 +41,6 @@ namespace JiShe.CollectBus.IoTDBProvider
_sessionFactory = sessionFactory;
_runtimeContext = runtimeContext;
RefreshSessionPool();
}
private void RefreshSessionPool()
{
try
{
_currentSession?.Dispose();
_currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
_currentSession.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (Exception ex)
{
//自动回退到备用SessionPool
_logger.LogError($"{nameof(RefreshSessionPool)} 切换SessionPool失败回退到默认配置{ex.Serialize()}");
_runtimeContext.UseTableSessionPool = !_runtimeContext.UseTableSessionPool;
_currentSession = _sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
}
}
/// <summary>
/// 切换 SessionPool
/// </summary>
/// <param name="useTableSession"></param>
public void SwitchSessionPool(bool useTableSession)
{
if (_runtimeContext.UseTableSessionPool != useTableSession)
{
_runtimeContext.UseTableSessionPool = useTableSession;
RefreshSessionPool();
}
}
/// <summary>
@ -84,7 +55,7 @@ namespace JiShe.CollectBus.IoTDBProvider
var tablet = BuildTablet(new[] { entity }, metadata);
await _currentSession.InsertAsync(tablet);
await CurrentSession.InsertAsync(tablet);
//int result = await _currentSession.InsertAsync(tablet);
//if (result <= 0)
@ -108,7 +79,7 @@ namespace JiShe.CollectBus.IoTDBProvider
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, metadata);
await _currentSession.InsertAsync(tablet);
await CurrentSession.InsertAsync(tablet);
//var result = await _currentSession.InsertAsync(tablet);
//if (result <= 0)
//{
@ -127,7 +98,7 @@ namespace JiShe.CollectBus.IoTDBProvider
public async Task<object> DeleteAsync<T>(QueryOptions options) where T : IoTEntity
{
var query = BuildDeleteSQL<T>(options);
var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
if (!sessionDataSet.HasNext())
{
@ -149,7 +120,7 @@ namespace JiShe.CollectBus.IoTDBProvider
public async Task<PagedResult<T>> QueryAsync<T>(QueryOptions options) where T : IoTEntity, new()
{
var query = BuildQuerySQL<T>(options);
var sessionDataSet = await _currentSession.ExecuteQueryStatementAsync(query);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
var result = new PagedResult<T>
{
@ -180,22 +151,45 @@ namespace JiShe.CollectBus.IoTDBProvider
foreach (var measurement in metadata.ColumnNames)
{
PropertyInfo propertyInfo = typeof(T).GetProperty(measurement);
if (propertyInfo==null)
if (propertyInfo == null)
{
throw new Exception($"{nameof(BuildTablet)} 构建表模型{typeof(T).Name}时,没有找到{measurement}属性,属于异常情况。");
}
var value = propertyInfo.GetValue(entity);
if (value != null)
if (propertyInfo.IsDefined(typeof(SingleMeasuringAttribute), false))//表示当前对象是单测点模式,
{
rowValues.Add(value);
Tuple<string, object> valueTuple = (Tuple<string, object>)value!;
//获得当前Field对应的是的在measurement column中的序号然后直接更新 metadata 中datetype对应序号的数据类型
var indexOf = metadata.ColumnNames.IndexOf(measurement);
metadata.ColumnNames[indexOf] = valueTuple.Item1;
Type item2Type = valueTuple.Item2.GetType();
metadata.DataTypes[indexOf] = GetDataTypeFromTypeName(valueTuple.Item2.GetType().Name);
rowValues.Add(valueTuple.Item2);
//TSDataType singleMeasuringTSDataType =
}
else
{
DataTypeValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue);
rowValues.Add(defaultValue);
if (value != null)
{
rowValues.Add(value);
}
else
{
//填充默认数据值
DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue);
rowValues.Add(defaultValue);
}
}
}
values.Add(rowValues);
if (!_runtimeContext.UseTableSessionPool)//树模型
{
devicePaths.Add(DevicePathBuilder.GetDevicePath(entity));
@ -346,7 +340,7 @@ namespace JiShe.CollectBus.IoTDBProvider
countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition));
}
var result = await _currentSession.ExecuteQueryStatementAsync(countQuery);
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
return result.HasNext() ? Convert.ToInt32(result.Next().Values[0]) : 0;
}
@ -418,35 +412,40 @@ namespace JiShe.CollectBus.IoTDBProvider
foreach (var prop in type.GetProperties())
{
//先获取Tag标签和属性标签
ColumnInfo? column = prop.GetCustomAttribute<TAGColumnAttribute>() is not null ? new ColumnInfo(
ColumnInfo? column = prop.GetCustomAttribute<TAGColumnAttribute>() is not null ? new ColumnInfo(
name: prop.Name,
category: ColumnCategory.TAG,
dataType: GetDataTypeFromTypeName(prop.PropertyType.Name), false
dataType: GetDataTypeFromTypeName(prop.PropertyType.Name),
false
) : prop.GetCustomAttribute<ATTRIBUTEColumnAttribute>() is not null ? new ColumnInfo(
prop.Name,
ColumnCategory.ATTRIBUTE,
GetDataTypeFromTypeName(prop.PropertyType.Name),false
) : null;
//最先检查是不是单测点
if (prop.GetCustomAttribute<SingleMeasuringAttribute>() is not null)
{
//单测点的情况下字段类型是Dictionary<string,object>
Dictionary<string, object> keyValuePairs = prop.GetValue(null) as Dictionary<string, object>;
column = new ColumnInfo(
keyValuePairs.Keys.First(),
ColumnCategory.FIELD,
GetDataTypeFromTypeName(prop.PropertyType.Name), false
);
}
else
{
//不是单测点的情况下,直接获取字段名称作为测点名称
column = prop.GetCustomAttribute<FIELDColumnAttribute>() is not null ? new ColumnInfo(
GetDataTypeFromTypeName(prop.PropertyType.Name),
false
) : prop.GetCustomAttribute<FIELDColumnAttribute>() is not null ? new ColumnInfo(
prop.Name,
ColumnCategory.FIELD,
GetDataTypeFromTypeName(prop.PropertyType.Name), false
) : null;
GetDataTypeFromTypeName(prop.PropertyType.Name),
false)
: null;
//最先检查是不是单侧点模式
SingleMeasuringAttribute singleMeasuringAttribute = prop.GetCustomAttribute<SingleMeasuringAttribute>();
if (singleMeasuringAttribute != null && column == null)
{
//warning: 单侧点模式注意事项
//Entity实体 字段类型是 Tuple<string,object>,Item1=>测点名称Item2=>测点值
//只有一个Filed字段。
//MeasuringName 默认为 SingleMeasuringAttribute.FieldName。
//DateTye 默认为 string在获取对用的Value值的时候在进行重置。
column = new ColumnInfo(
singleMeasuringAttribute.FieldName,
ColumnCategory.FIELD,
GetDataTypeFromTypeName(singleMeasuringAttribute.FieldName),
true
);
}
////按优先级顺序检查属性,避免重复反射
@ -481,7 +480,13 @@ namespace JiShe.CollectBus.IoTDBProvider
{
var metadata = new DeviceMetadata();
//按业务逻辑顺序处理TAG -> FIELD -> ATTRIBUTE
//先检查是不是单侧点模型
if (columns.Any(c => c.IsSingleMeasuring))
{
metadata.IsSingleMeasuring = true;
}
//按业务逻辑顺序处理TAG -> ATTRIBUTE -> FIELD
var groupedColumns = columns
.GroupBy(c => c.Category)
.ToDictionary(g => g.Key, g => g.ToList());
@ -522,7 +527,7 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <summary>
/// 是否是单测点
/// </summary>
public bool IsSingleMeasuring { get;}
public bool IsSingleMeasuring { get; }
/// <summary>
/// 列类型
@ -534,7 +539,7 @@ namespace JiShe.CollectBus.IoTDBProvider
/// </summary>
public TSDataType DataType { get; }
public ColumnInfo(string name, ColumnCategory category, TSDataType dataType,bool isSingleMeasuring)
public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring)
{
Name = name;
Category = category;
@ -581,7 +586,7 @@ namespace JiShe.CollectBus.IoTDBProvider
/// <summary>
/// 根据类型名称获取 IoTDB 数据默认值
/// </summary>
private readonly IReadOnlyDictionary<string, object> DataTypeValueMap =
private readonly IReadOnlyDictionary<string, object> DataTypeDefaultValueMap =
new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
{
["BOOLEAN"] = false,

View File

@ -17,6 +17,7 @@ namespace JiShe.CollectBus.IoTDBProvider.Provider
{
private readonly IoTDBOptions _options;
private readonly ConcurrentDictionary<bool, IIoTDBSessionPool> _pools = new();
private bool _disposed;
public IoTDBSessionFactory(IOptions<IoTDBOptions> options)
{
@ -25,12 +26,27 @@ namespace JiShe.CollectBus.IoTDBProvider.Provider
public IIoTDBSessionPool GetSessionPool(bool useTableSession)
{
if (_disposed) throw new ObjectDisposedException(nameof(IoTDBSessionFactory));
return _pools.GetOrAdd(useTableSession, key =>
{
return key
? new TableSessionPoolAdapter(_options)
var pool = key
? (IIoTDBSessionPool)new TableSessionPoolAdapter(_options)
: new SessionPoolAdapter(_options);
pool.OpenAsync().ConfigureAwait(false).GetAwaiter().GetResult(); ;
return pool;
});
}
public void Dispose()
{
foreach (var pool in _pools.Values)
{
pool.Dispose();
}
_pools.Clear();
_disposed = true;
}
}
}