964 lines
37 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Collections.Concurrent;
using System.ComponentModel.DataAnnotations;
using System.Diagnostics;
using System.Reflection;
using System.Reflection.Metadata.Ecma335;
using System.Text;
using System.Threading.Tasks;
using Apache.IoTDB;
using Apache.IoTDB.DataStructure;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Attributes;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities;
using JiShe.CollectBus.Analyzers.Shared;
using JiShe.CollectBus.IoTDB.Exceptions;
using System.Diagnostics.Metrics;
using Newtonsoft.Json.Linq;
using static System.Runtime.InteropServices.JavaScript.JSType;
using System.Text.RegularExpressions;
namespace JiShe.CollectBus.IoTDB.Provider
{
/// <summary>
/// IoTDB数据源
/// </summary>
public class IoTDbProvider : IIoTDbProvider, ITransientDependency
{
private static readonly ConcurrentDictionary<Type, DeviceMetadata> MetadataCache = new();
private readonly ILogger<IoTDbProvider> _logger;
private readonly IIoTDbSessionFactory _sessionFactory;
private readonly IoTDBRuntimeContext _runtimeContext;
private IIoTDbSessionPool CurrentSession =>
_sessionFactory.GetSessionPool(_runtimeContext.UseTableSessionPool);
/// <summary>
/// IoTDbProvider
/// </summary>
/// <param name="logger"></param>
/// <param name="sessionFactory"></param>
/// <param name="runtimeContext"></param>
public IoTDbProvider(
ILogger<IoTDbProvider> logger,
IIoTDbSessionFactory sessionFactory,
IoTDBRuntimeContext runtimeContext)
{
_logger = logger;
_sessionFactory = sessionFactory;
_runtimeContext = runtimeContext;
}
/// <summary>
/// 插入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entity"></param>
/// <returns></returns>
public async Task InsertAsync<T>(T entity) where T : IoTEntity
{
try
{
var metadata = await GetMetadata<T>();
var tablet = BuildTablet(new[] { entity }, metadata);
if (tablet == null || tablet.Count <= 0)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
await CurrentSession.InsertAsync(tablet.First());
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时发生异常");
throw;
}
}
/// <summary>
/// 批量插入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public async Task BatchInsertAsync<T>(IEnumerable<T> entities) where T : IoTEntity
{
try
{
var metadata = await GetMetadata<T>();
var batchSize = 1000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, metadata);
if (tablet == null || tablet.Count <= 0)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
foreach (var item in tablet)
{
await CurrentSession.InsertAsync(item);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常");
throw;
}
}
/// <summary>
/// 批量插入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="deviceMetadata">设备元数据</param>
/// <param name="entities"></param>
/// <returns></returns>
public async Task BatchInsertAsync<T>(DeviceMetadata deviceMetadata, IEnumerable<T> entities) where T : IoTEntity
{
try
{
var batchSize = 2000;
var batches = entities.Chunk(batchSize);
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, deviceMetadata);
if (tablet == null || tablet.Count <= 0)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
foreach (var item in tablet)
{
await CurrentSession.InsertAsync(item);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常");
throw;
}
}
/// <summary>
/// 删除数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
public async Task<object> DeleteAsync<T>(IoTDBQueryOptions options) where T : IoTEntity
{
try
{
var query = await BuildDeleteSQL<T>(options);
var result = await CurrentSession.ExecuteQueryStatementAsync(query);
if (result == null)
{
return 0;
}
if (!result.HasNext())
{
_logger.LogWarning($"{typeof(T).Name} IoTDB删除{typeof(T).Name}的数据时,没有返回受影响记录数量。");
return 0;
}
//获取唯一结果行
var row = result.Next();
await result.Close();
var dataResult = row.Values[0];
return dataResult;
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(DeleteAsync)} IoTDB删除{typeof(T).Name}的数据时发生异常");
throw;
}
}
/// <summary>
/// 获取设备元数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public async Task<DeviceMetadata> GetMetadata<T>() where T : IoTEntity
{
var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var columns = CollectColumnMetadata<T>(accessor);
var tmpMetadata = BuildDeviceMetadata<T>(columns, accessor);
string tableNameOrTreePath = string.Empty;
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
if (tableNameOrTreePathAttribute != null)
{
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
}
tmpMetadata.EntityName = accessor.EntityName;
tmpMetadata.EntityType = accessor.EntityType;
tmpMetadata.TableNameOrTreePath = tableNameOrTreePath;
var metaData = MetadataCache.AddOrUpdate(
typeof(T),
addValueFactory: t => tmpMetadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) =>
{
var columns = CollectColumnMetadata(accessor);
var metadata = BuildDeviceMetadata(columns, accessor);
//对现有值 existingValue 进行修改,返回新值
string tableNameOrTreePath = string.Empty;
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
if (tableNameOrTreePathAttribute != null)
{
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
}
existingValue.ColumnNames = metadata.ColumnNames;
existingValue.DataTypes = metadata.DataTypes;
return existingValue;
}
);
//var metaData = MetadataCache.GetOrAdd(typeof(T), type =>
//{
// var columns = CollectColumnMetadata(accessor);
// var metadata = BuildDeviceMetadata(columns, accessor);
// string tableNameOrTreePath = string.Empty;
// var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
// if (tableNameOrTreePathAttribute != null)
// {
// tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
// }
// metadata.EntityName = accessor.EntityName;
// metadata.EntityType = accessor.EntityType;
// metadata.TableNameOrTreePath = tableNameOrTreePath;
// return metadata;
//});
return await Task.FromResult(metaData);
}
/// <summary>
/// 查询数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
public async Task<BusPagedResult<T>> QueryAsync<T>(IoTDBQueryOptions options) where T : IoTEntity, new()
{
try
{
var stopwatch2 = Stopwatch.StartNew();
var query = await BuildQuerySQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
var result = new BusPagedResult<T>
{
TotalCount = await GetTotalCount<T>(options),
Items = await ParseResults<T>(sessionDataSet, options.PageSize),
PageIndex = options.PageIndex,
PageSize = options.PageSize,
};
stopwatch2.Stop();
//int totalPageCount = (int)Math.Ceiling((double)result.TotalCount / options.PageSize);
if (result.Items.Count() < result.PageSize)
{
result.HasNext = false;
}
else
{
result.HasNext = true;
}
//result.HasNext = result.Items.Count() > 0 ? result.Items.Count() < result.PageSize : false;
return result;
}
catch (Exception ex)
{
CurrentSession.Dispose();
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询{typeof(T).Name}的数据时发生异常");
throw;
}
}
/// <summary>
/// 构建Tablet
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entities">表实体</param>
/// <param name="metadata">设备元数据</param></param>
/// <returns></returns>
private List<Tablet> BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity
{
var entitiyList = entities.ToList();
if (entitiyList == null || entitiyList.Count <= 0)
{
return null;
}
//var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
//var memberCache = BuildMemberCache(accessor);
if (metadata.EntityType == null)
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101");
}
if (metadata.EntityType == EntityTypeEnum.Other)
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 不属于IoTDB数据模型实体属于异常情况-102");
}
if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true)
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接属于异常情况-103");
}
else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false)
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接属于异常情况-104");
}
string tableNameOrTreePath = string.Empty;
if (_runtimeContext.UseTableSessionPool)//表模型
{
//如果指定了路径
if (!string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath))
{
tableNameOrTreePath = metadata.TableNameOrTreePath;
}
else
{
tableNameOrTreePath = DevicePathBuilder.GetTableName<T>();
}
return new List<Tablet>() { BuildTablet(entitiyList, metadata, tableNameOrTreePath) };
}
else
{
//树模型的时候实体的设备Id可能会不同因此需要根据不同路径进行存储。
var tabletList = new List<Tablet>();
var groupEntities = entitiyList.GroupBy(d => d.DevicePath).ToList();
foreach (var group in groupEntities)
{
tabletList.Add(BuildTablet(group.ToList(), metadata, group.Key));
}
return tabletList;
}
}
private Tablet BuildTablet<T>(List<T> entities, DeviceMetadata metadata, string tableNameOrTreePath) where T : IoTEntity
{
// 预分配内存结构
var rowCount = entities.Count;
var timestamps = new long[rowCount];
var values = new object[rowCount][];
for (var i = 0; i < values.Length; i++)
{
values[i] = new object[metadata.ColumnNames.Count];
}
List<string> tempColumnNames = new List<string>();
tempColumnNames.AddRange(metadata.ColumnNames);
// 顺序处理数据(保证线程安全)
for (var row = 0; row < rowCount; row++)
{
var entity = entities[row];
timestamps[row] = entity.Timestamps;
for (int i = 0; i < metadata.ColumnNames.Count; i++)
{
var processor = metadata.Processors[i];
if (processor.IsSingleMeasuring)
{
tempColumnNames[i] = (string)processor.SingleMeasuringNameGetter(entity);
}
// 获取并转换值
values[row][i] = processor.ValueGetter(entity);
}
}
return _runtimeContext.UseTableSessionPool
? BuildTableSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList())
: BuildSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList());
}
/// <summary>
/// 构建tree模型的Tablet
/// </summary>
/// <param name="metadata">已解析的设备数据元数据</param>
/// <param name="devicePath">设备路径</param>
/// <param name="columns">数据列集合</param>
/// <param name="values">数据集合</param>
/// <param name="timestamps">时间戳集合</param>
/// <returns></returns>
private Tablet BuildSessionTablet(DeviceMetadata metadata, string devicePath, List<string> columns, List<List<object>> values, List<long> timestamps)
{
//todo 树模型需要去掉TAG类型和ATTRIBUTE类型的字段只需要保留FIELD类型字段即可
return new Tablet(
devicePath,
columns,
metadata.DataTypes,
values,
timestamps
);
}
/// <summary>
/// 构建表模型的Tablet
/// </summary>
/// <param name="metadata">已解析的设备数据元数据</param>
/// <param name="tableName">表名称</param>
/// <param name="columns">数据列集合</param>
/// <param name="values">数据集合</param>
/// <param name="timestamps">时间戳集合</param>
/// <returns></returns>
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List<string> columns, List<List<object>> values, List<long> timestamps)
{
var tablet = new Tablet(
tableName,
columns,
metadata.ColumnCategories,
metadata.DataTypes,
values,
timestamps
);
return tablet;
}
/// <summary>
/// 构建查询语句
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
private async Task<string> BuildQuerySQL<T>(IoTDBQueryOptions options) where T : IoTEntity
{
var metadata = await GetMetadata<T>();
var sb = new StringBuilder("SELECT ");
sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTreePath}");
if (options.Conditions.Any())
{
sb.Append(" WHERE ");
sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition));
}
sb.Append($" LIMIT {options.PageSize} OFFSET {options.PageIndex * options.PageSize}");
return sb.ToString();
}
/// <summary>
/// 构建删除语句
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
private async Task<string> BuildDeleteSQL<T>(IoTDBQueryOptions options) where T : IoTEntity
{
var metadata = await GetMetadata<T>();
var sb = new StringBuilder();
if (!_runtimeContext.UseTableSessionPool)
{
sb.Append("DELETE ");
}
else
{
sb.Append("DROP ");
}
sb.Append($" FROM {options.TableNameOrTreePath}");
sb.AppendJoin(", ", metadata.ColumnNames);
if (options.Conditions.Any())
{
sb.Append(" WHERE ");
sb.AppendJoin(" AND ", options.Conditions.Select(TranslateCondition));
}
return sb.ToString();
}
/// <summary>
/// 将查询条件转换为SQL语句
/// </summary>
/// <param name="condition"></param>
/// <returns></returns>
/// <exception cref="NotSupportedException"></exception>
private string TranslateCondition(QueryCondition condition)
{
return condition.Operator switch
{
">" => condition.IsNumber ? $"{condition.Field} > {condition.Value}" : $"{condition.Field} > '{condition.Value}'",
"<" => condition.IsNumber ? $"{condition.Field} < {condition.Value}" : $"{condition.Field} < '{condition.Value}'",
"=" => condition.IsNumber ? $"{condition.Field} = {condition.Value}" : $"{condition.Field} = '{condition.Value}'",
_ => throw new NotSupportedException($"{nameof(TranslateCondition)} 将查询条件转换为SQL语句时操作符 {condition.Operator} 属于异常情况")
};
}
/// <summary>
/// 获取查询条件的总数量
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="options"></param>
/// <returns></returns>
private async Task<int> GetTotalCount<T>(IoTDBQueryOptions options) where T : IoTEntity
{
var countQuery = $"SELECT COUNT(*) FROM {options.TableNameOrTreePath}";
if (options.Conditions.Any())
{
countQuery += " WHERE " + string.Join(" AND ", options.Conditions.Select(TranslateCondition));
}
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
if (result == null)
{
return 0;
}
if (!result.HasNext())
{
return 0;
}
var count = Convert.ToInt32(result.Next().Values[0]);
await result.Close();
return count;
}
/// <summary>
/// 解析查询结果
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="dataSet"></param>
/// <param name="pageSize"></param>
/// <returns></returns>
private async Task<IEnumerable<T>> ParseResults<T>(SessionDataSet dataSet, int pageSize) where T : IoTEntity, new()
{
var results = new List<T>();
var metadata = await GetMetadata<T>();
var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var memberCache = BuildMemberCache(accessor);
while (dataSet.HasNext() && results.Count < pageSize)
{
var record = dataSet.Next();
var entity = new T
{
Timestamps = record.Timestamps
};
for (int i = 0; i < metadata.Processors.Count; i++)
{
var value = record.Values[i];
metadata.Processors[i].ValueSetter(entity, value);
}
results.Add(entity);
}
await dataSet.Close();
return results;
}
/// <summary>
/// 获取设备元数据的列
/// </summary>
/// <param name="accessor"></param>
/// <returns></returns>
private List<ColumnInfo> CollectColumnMetadata<T>(ISourceEntityAccessor<T> accessor)
{
var columns = new List<ColumnInfo>();
var memberCache = BuildMemberCache(accessor);
foreach (var member in accessor.MemberList)
{
// 过滤元组子项
if (member.NameOrPath.Contains(".Item")) continue;
// 类型名称处理
Type declaredType = member.DeclaredType;
var underlyingType = Nullable.GetUnderlyingType(declaredType);
string declaredTypeName = underlyingType?.Name ?? member.DeclaredTypeName;
// 特性查询优化
var attributes = member.CustomAttributes ?? Enumerable.Empty<Attribute>();
var tagAttr = attributes.OfType<TAGColumnAttribute>().FirstOrDefault();
var attrColumn = attributes.OfType<ATTRIBUTEColumnAttribute>().FirstOrDefault();
var fieldColumn = attributes.OfType<FIELDColumnAttribute>().FirstOrDefault();
var singleMeasuringAttr = attributes.OfType<SingleMeasuringAttribute>().FirstOrDefault();
// 构建ColumnInfo
ColumnInfo? column = null;
if (tagAttr != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName);
}
else if (attrColumn != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName);
}
else if (fieldColumn != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName);
}
// 单测模式处理
if (singleMeasuringAttr != null && column == null)
{
var tupleItemKey = $"{member.NameOrPath}.Item2";
if (!memberCache.TryGetValue(tupleItemKey, out var tupleMember))
{
throw new Exception($"{nameof(CollectColumnMetadata)} {accessor.EntityName} {member.NameOrPath} 单侧点属性解析异常");
}
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true, tupleMember.DeclaredTypeName);
}
if (column.HasValue) columns.Add(column.Value);
}
return columns;
}
/// <summary>
/// 构建设备元数据
/// </summary>
/// <param name="typeInfo">待解析的类</param>
/// <param name="columns">已处理好的数据列</param>
/// <returns></returns>
private DeviceMetadata BuildDeviceMetadata<T>(List<ColumnInfo> columns, ISourceEntityAccessor<T> accessor) where T : IoTEntity
{
var metadata = new DeviceMetadata();
//先检查是不是单侧点模型
if (columns.Any(c => c.IsSingleMeasuring))
{
metadata.IsSingleMeasuring = true;
}
//按业务逻辑顺序处理TAG -> ATTRIBUTE -> FIELD
var groupedColumns = columns
.GroupBy(c => c.Category)
.ToDictionary(g => g.Key, g => g.ToList());
ProcessCategory(groupedColumns, ColumnCategory.TAG, metadata);
ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata);
ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata);
// 新增处理器初始化
foreach (var item in metadata.ColumnNames)
{
ColumnInfo column = columns.FirstOrDefault(d => d.Name == item);
var processor = new ColumnProcessor
{
ColumnName = column.Name,
IsSingleMeasuring = column.IsSingleMeasuring,
GetConverter = GetterConverter(column.DeclaredTypeName.ToUpper()),
SetConverter = SetterConverter(column.Name.ToUpper()),
TSDataType = column.DataType,
};
// 处理单测点
if (column.IsSingleMeasuring)
{
var item1Member = accessor.MemberList
.First(m => m.NameOrPath == $"{column.Name}.Item1");
processor.SingleMeasuringNameGetter = (obj) =>
{
// 获取原始值并转为字符串
object rawValue = item1Member.Getter(obj);
string value = rawValue?.ToString();
ValidateSingleMeasuringName(value);
return value;
};
var item2Member = accessor.MemberList
.First(m => m.NameOrPath == $"{column.Name}.Item2");
processor.ValueGetter = (obj) => {
object rawValue = item2Member.Getter(obj);
return processor.GetConverter(rawValue);
};
}
else
{
// 获取对应的成员访问器
var member = accessor.MemberList.First(m => m.NameOrPath == column.Name);
processor.ValueGetter = (obj) => {
object rawValue = member.Getter(obj);
return processor.GetConverter(rawValue);
};
//对应的属性成员进行赋值
processor.ValueSetter = (obj, value) =>
{
dynamic tempValue = GetTSDataValue(processor.TSDataType, value);
var rawValue = processor.SetConverter(value);
member.Setter(obj, rawValue);
};
}
metadata.Processors.Add(processor);
}
return metadata;
}
/// <summary>
/// 验证单测点名称格式
/// </summary>
private void ValidateSingleMeasuringName(string value)
{
if (string.IsNullOrWhiteSpace(value))
{
return;
}
// 规则1: 严格检查ASCII字母和数字0-9, A-Z, a-z
bool hasInvalidChars = value.Any(c =>
!((c >= 'A' && c <= 'Z') ||
(c >= 'a' && c <= 'z') ||
(c >= '0' && c <= '9')));
// 规则2: 首字符不能是数字
bool startsWithDigit = value[0] >= '0' && value[0] <= '9';
// 规则3: 全字符串不能都是数字
bool allDigits = value.All(c => c >= '0' && c <= '9');
// 按优先级抛出具体异常
if (hasInvalidChars)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字");
}
else if (startsWithDigit)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能以数字开头");
}
else if (allDigits)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能全为数字");
}
}
/// <summary>
/// 取值的处理器
/// </summary>
/// <param name="declaredTypeName"></param>
/// <returns></returns>
private Func<object, object> GetterConverter(string declaredTypeName)
{
return declaredTypeName switch
{
"DATETIME" => value => ((DateTime)value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
_ => value => value
};
}
/// <summary>
/// 设置值的处理
/// </summary>
/// <param name="columnName"></param>
/// <returns></returns>
private Func<object, object> SetterConverter(string columnName) =>
columnName.ToLower().EndsWith("time")
? value => new DateTime(Convert.ToInt64(value), DateTimeKind.Utc)
: value => value;
/// <summary>
/// 处理不同列类型的逻辑
/// </summary>
/// <param name="groupedColumns"></param>
/// <param name="category"></param>
/// <param name="metadata"></param>
private void ProcessCategory(IReadOnlyDictionary<ColumnCategory, List<ColumnInfo>> groupedColumns, ColumnCategory category, DeviceMetadata metadata)
{
if (groupedColumns.TryGetValue(category, out var cols))
{
metadata.ColumnNames.AddRange(cols.Select(c => c.Name));
metadata.ColumnCategories.AddRange(cols.Select(c => c.Category));
metadata.DataTypes.AddRange(cols.Select(c => c.DataType));
}
}
/// <summary>
/// 数据列结构
/// </summary>
private readonly struct ColumnInfo
{
/// <summary>
/// 列名
/// </summary>
public string Name { get; }
/// <summary>
/// 声明的类型的名称
/// </summary>
public string DeclaredTypeName { get; }
/// <summary>
/// 是否是单测点
/// </summary>
public bool IsSingleMeasuring { get; }
/// <summary>
/// 列类型
/// </summary>
public ColumnCategory Category { get; }
/// <summary>
/// 数据类型
/// </summary>
public TSDataType DataType { get; }
public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring, string declaredTypeName)
{
Name = name;
Category = category;
DataType = dataType;
IsSingleMeasuring = isSingleMeasuring;
DeclaredTypeName = declaredTypeName;
}
}
/// <summary>
/// 根据类型名称获取对应的 IoTDB 数据类型
/// </summary>
/// <param name="typeName">类型名称(不区分大小写)</param>
/// <returns>对应的 TSDataType默认返回 TSDataType.STRING</returns>
private TSDataType GetDataTypeFromTypeName(string typeName)
{
if (string.IsNullOrWhiteSpace(typeName))
return TSDataType.STRING;
return DataTypeMap.TryGetValue(typeName.Trim(), out var dataType)
? dataType
: TSDataType.STRING;
}
/// <summary>
/// 根据类型名称获取 IoTDB 数据类型
/// </summary>
private readonly IReadOnlyDictionary<string, TSDataType> DataTypeMap =
new Dictionary<string, TSDataType>(StringComparer.OrdinalIgnoreCase)
{
["BOOLEAN"] = TSDataType.BOOLEAN,
["INT32"] = TSDataType.INT32,
["INT64"] = TSDataType.INT64,
["FLOAT"] = TSDataType.FLOAT,
["DOUBLE"] = TSDataType.DOUBLE,
["TEXT"] = TSDataType.TEXT,
["NULLTYPE"] = TSDataType.NONE,
["DATETIME"] = TSDataType.TIMESTAMP,
["DATE"] = TSDataType.DATE,
["BLOB"] = TSDataType.BLOB,
["DECIMAL"] = TSDataType.STRING,
["STRING"] = TSDataType.STRING
};
/// <summary>
/// 根据类型名称获取 IoTDB 数据默认值
/// </summary>
private readonly IReadOnlyDictionary<string, object> DataTypeDefaultValueMap =
new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
{
["BOOLEAN"] = false,
["INT32"] = 0,
["INT64"] = 0,
["FLOAT"] = 0.0f,
["DOUBLE"] = 0.0d,
["TEXT"] = string.Empty,
["NULLTYPE"] = null,
["DATETIME"] = null,
["DATE"] = null,
["BLOB"] = null,
["DECIMAL"] = "0.0",
["STRING"] = string.Empty
};
/// <summary>
/// IoTDB 数据类型与.net类型映射
/// </summary>
/// <param name="tSDataType"></param>
/// <param name="value"></param>
/// <returns></returns>
private dynamic GetTSDataValue(TSDataType tSDataType, object value) =>
tSDataType switch
{
TSDataType.BOOLEAN => Convert.ToBoolean(value),
TSDataType.INT32 => Convert.ToInt32(value),
TSDataType.INT64 => Convert.ToInt64(value),
TSDataType.FLOAT => Convert.ToDouble(value),
TSDataType.DOUBLE => Convert.ToDouble(value),
TSDataType.TEXT => Convert.ToString(value),
TSDataType.NONE => null,
TSDataType.TIMESTAMP => Convert.ToInt64(value),
TSDataType.DATE => Convert.ToDateTime(value),
TSDataType.BLOB => Convert.ToByte(value),
TSDataType.STRING => Convert.ToString(value),
_ => Convert.ToString(value)
};
/// <summary>
/// 缓存实体属性信息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="accessor"></param>
/// <returns></returns>
private Dictionary<string, EntityMemberInfo> BuildMemberCache<T>(ISourceEntityAccessor<T> accessor)
{
var cache = new Dictionary<string, EntityMemberInfo>(StringComparer.Ordinal);
foreach (var member in accessor.MemberList)
{
cache[member.NameOrPath] = member;
}
return cache;
}
private static readonly Regex _asciiAlphanumericRegex = new Regex(@"^[a-zA-Z0-9]*$", RegexOptions.Compiled);
}
}