优化IoTDB驱动,实现30个字段实体的270W任务数据开发环境处理平均耗时在70秒

This commit is contained in:
ChenYi 2025-05-09 17:54:52 +08:00
parent 541c118dbb
commit f6130e1d0b
7 changed files with 378 additions and 377 deletions

View File

@ -573,7 +573,7 @@ namespace JiShe.CollectBus.IncrementalGenerator
$"new EntityMemberInfo(" +
$"\"{prop.Name}.{elementName}\", " +
$"typeof({elementType}), " +
$"\"{elementDeclaredName}\", " +
$"typeof({elementType}).Name, " +//$"\"{elementDeclaredName}\", " +
$"(e) => Get{prop.Name}_{elementName}(({entityType})e), " +
$"(e, v) => Set{prop.Name}_{elementName}(({entityType})e, ({elementType})v))");
}

View File

@ -1,6 +1,7 @@
using JiShe.CollectBus.Common.Attributes;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.IoTDB.Attributes;
using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IoTDB.Model
{
@ -43,5 +44,27 @@ namespace JiShe.CollectBus.IoTDB.Model
/// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取
/// </summary>
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
/// <summary>
/// 设备路径
/// </summary>
public virtual string DevicePath
{
get
{
return $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`";
}
set
{
if (string.IsNullOrWhiteSpace(value))
{
DevicePath = $"root.{SystemName.ToLower()}.`{ProjectId}`.`{DeviceType}`.{DataType}.`{DeviceId}`";
}
else
{
DevicePath = value;
}
}
}
}
}

View File

@ -6,8 +6,18 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <summary>
/// 设备元数据
/// </summary>
public class DeviceMetadata
public sealed class DeviceMetadata
{
/// <summary>
/// 实体类名称
/// </summary>
public string EntityName { get; set; }
/// <summary>
/// 设备表名或树路径如果实体没有添加TableNameOrTreePath,此处为空
/// </summary>
public string TableNameOrTreePath { get; set; }
/// <summary>
/// 实体类型枚举
/// </summary>
@ -31,6 +41,42 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <summary>
/// 值类型集合用于构建Table的值类型也就是dataTypes参数
/// </summary>
public List<TSDataType> DataTypes { get; } = new();
public List<TSDataType> DataTypes { get; set; } = new();
/// <summary>
/// 列处理信息集合
/// </summary>
public List<ColumnProcessor> Processors { get; } = new List<ColumnProcessor>();
}
/// <summary>
/// 列处理信息结构
/// </summary>
public struct ColumnProcessor
{
/// <summary>
/// 列名
/// </summary>
public string ColumnName;
/// <summary>
/// 值获取委托
/// </summary>
public Func<object, object> ValueGetter;
/// <summary>
/// 类型转换委托
/// </summary>
public Func<object, object> Converter;
/// <summary>
/// 是否单测点
/// </summary>
public bool IsSingleMeasuring;
/// <summary>
/// 单测点名称委托
/// </summary>
public Func<object, object> SingleMeasuringNameGetter;
}
}

View File

@ -22,6 +22,10 @@ using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities;
using JiShe.CollectBus.Analyzers.Shared;
using JiShe.CollectBus.IoTDB.Exceptions;
using System.Diagnostics.Metrics;
using Newtonsoft.Json.Linq;
using static System.Runtime.InteropServices.JavaScript.JSType;
using System.Text.RegularExpressions;
namespace JiShe.CollectBus.IoTDB.Provider
{
@ -68,8 +72,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
var metadata = await GetMetadata<T>();
var tablet = BuildTablet(new[] { entity }, metadata);
if (tablet == null || tablet.Count <= 0)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
await CurrentSession.InsertAsync(tablet);
await CurrentSession.InsertAsync(tablet.First());
}
catch (Exception ex)
{
@ -95,7 +104,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, metadata);
await CurrentSession.InsertAsync(tablet);
if (tablet == null || tablet.Count <= 0)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
foreach (var item in tablet)
{
await CurrentSession.InsertAsync(item);
}
}
}
catch (Exception ex)
@ -123,7 +140,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
foreach (var batch in batches)
{
var tablet = BuildTablet(batch, deviceMetadata);
await CurrentSession.InsertAsync(tablet);
if (tablet == null || tablet.Count <= 0)
{
_logger.LogError($"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时 tablet 为null");
return;
}
foreach (var item in tablet)
{
await CurrentSession.InsertAsync(item);
}
}
}
catch (Exception ex)
@ -181,22 +206,54 @@ namespace JiShe.CollectBus.IoTDB.Provider
var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var columns = CollectColumnMetadata<T>(accessor);
var metadata = BuildDeviceMetadata<T>(columns);
var tmpMetadata = BuildDeviceMetadata<T>(columns, accessor);
string tableNameOrTreePath = string.Empty;
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
if (tableNameOrTreePathAttribute != null)
{
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
}
tmpMetadata.EntityName = accessor.EntityName;
tmpMetadata.EntityType = accessor.EntityType;
tmpMetadata.TableNameOrTreePath = tableNameOrTreePath;
var metaData = MetadataCache.AddOrUpdate(
typeof(T),
addValueFactory: t => metadata, // 如果键不存在,用此值添加
addValueFactory: t => tmpMetadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) =>
{
var columns = CollectColumnMetadata(accessor);
var metadata = BuildDeviceMetadata<T>(columns);
var metadata = BuildDeviceMetadata(columns, accessor);
//对现有值 existingValue 进行修改,返回新值
string tableNameOrTreePath = string.Empty;
var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
if (tableNameOrTreePathAttribute != null)
{
tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
}
existingValue.ColumnNames = metadata.ColumnNames;
existingValue.DataTypes = metadata.DataTypes;
return existingValue;
}
);
metadata.EntityType = accessor.EntityType;
//var metaData = MetadataCache.GetOrAdd(typeof(T), type =>
//{
// var columns = CollectColumnMetadata(accessor);
// var metadata = BuildDeviceMetadata(columns, accessor);
// string tableNameOrTreePath = string.Empty;
// var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
// if (tableNameOrTreePathAttribute != null)
// {
// tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
// }
// metadata.EntityName = accessor.EntityName;
// metadata.EntityType = accessor.EntityType;
// metadata.TableNameOrTreePath = tableNameOrTreePath;
// return metadata;
//});
return await Task.FromResult(metaData);
}
@ -251,338 +308,110 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
}
///// <summary>
///// 构建Tablet
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="entities">表实体</param>
///// <param name="metadata">设备元数据</param></param>
///// <returns></returns>
//private Tablet BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity
//{
// var timestamps = new List<long>();
// var values = new List<List<object>>();
// var devicePaths = new HashSet<string>();
// List<string> tempColumnNames = new List<string>();
// tempColumnNames.AddRange(metadata.ColumnNames);
// var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
// var memberCache = new Dictionary<string, EntityMemberInfo>(); // 缓存优化查询
// // 预构建成员缓存Key: NameOrPath
// foreach (var member in accessor.MemberList)
// {
// memberCache[member.NameOrPath] = member;
// }
// if (accessor.EntityType == null || metadata.EntityType == null)
// {
// throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101");
// }
// if (metadata.EntityType != accessor.EntityType)
// {
// throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 和{nameof(DeviceMetadata)}的 EntityType 不一致,属于异常情况,-102");
// }
// if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true)
// {
// throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接属于异常情况-103");
// }
// else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false)
// {
// throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接属于异常情况-104");
// }
// string tableNameOrTreePath = string.Empty;
// var tableNameOrTreePathAttribute = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
// if (tableNameOrTreePathAttribute != null)
// {
// tableNameOrTreePath = tableNameOrTreePathAttribute.TableNameOrTreePath;
// }
// foreach (var entity in entities)
// {
// timestamps.Add(entity.Timestamps);
// var rowValues = new List<object>();
// foreach (var measurement in metadata.ColumnNames)
// {
// if (!memberCache.TryGetValue(measurement, out var member))
// {
// throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName}没有找到{measurement}对应的member信息-105");
// }
// var value = member.GetValue(entity);
// // 特性查询优化
// var attributes = member.CustomAttributes ?? Enumerable.Empty<Attribute>();
// var singleMeasuringAttr = attributes.OfType<SingleMeasuringAttribute>().FirstOrDefault();
// if (singleMeasuringAttr != null)//如果是单侧点
// {
// var tupleItem1Key = $"{member.NameOrPath}.Item1";
// if (!memberCache.TryGetValue(tupleItem1Key, out var tuple1Member))
// {
// throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item1 信息,-106");
// }
// int indexOf = metadata.ColumnNames.IndexOf(measurement);
// tempColumnNames[indexOf] = (string)tuple1Member.GetValue(entity);
// var tupleItem2Key = $"{member.NameOrPath}.Item2";
// if (!memberCache.TryGetValue(tupleItem2Key, out var tuple2Member))
// {
// throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构时{accessor.EntityName} 没有找到{measurement}对应的member Item2 信息,-107");
// }
// value = tuple2Member.GetValue(entity);
// }
// if (value != null)
// {
// var tempValue = member.DeclaredTypeName.ToUpper() switch
// {
// "DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
// _ => value
// };
// rowValues.Add(tempValue);
// }
// else
// {
// rowValues.Add(value);
// }
// }
// values.Add(rowValues);
// //如果指定了路径
// if (!string.IsNullOrWhiteSpace(tableNameOrTreePath))
// {
// devicePaths.Add(tableNameOrTreePath);
// }
// else
// {
// if (!_runtimeContext.UseTableSessionPool)//树模型
// {
// devicePaths.Add(DevicePathBuilder.GetDevicePath(entity));
// }
// else
// {
// devicePaths.Add(DevicePathBuilder.GetTableName<T>());
// }
// }
// }
// if (devicePaths.Count > 1)
// {
// throw new Exception($"{nameof(BuildTablet)} 构建Tablet《{typeof(T).Name}》时,批量插入的设备路径不一致。");
// }
// return _runtimeContext.UseTableSessionPool
// ? BuildTableSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps)
// : BuildSessionTablet(metadata, devicePaths.First(), tempColumnNames,values, timestamps);
//}
/// <summary>
/// 构建Tablet
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entities">表实体集合</param>
/// <param name="entities">表实体</param>
/// <param name="metadata">设备元数据</param></param>
/// <returns></returns>
private Tablet BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity
private List<Tablet> BuildTablet<T>(IEnumerable<T> entities, DeviceMetadata metadata) where T : IoTEntity
{
// 前置校验
ValidateMetadataAndAccessor<T>(metadata, out var accessor);
// 初始化数据结构
var (timestamps, values, devicePaths) = (new List<long>(), new List<List<object>>(), new HashSet<string>());
var tempColumnNames = new List<string>(metadata.ColumnNames);
var memberCache = BuildMemberCache(accessor);
var tableNameOrTreePath = GetTableNameOrTreePath<T>();
// 处理每个实体
foreach (var entity in entities)
var entitiyList = entities.ToList();
if (entitiyList == null || entitiyList.Count <= 0)
{
ProcessEntity(entity, accessor, metadata, memberCache, tempColumnNames, timestamps, values);
UpdateDevicePaths(entity, tableNameOrTreePath, devicePaths);
return null;
}
//var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
//var memberCache = BuildMemberCache(accessor);
if (metadata.EntityType == null)
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 没有指定,属于异常情况,-101");
}
if (metadata.EntityType == EntityTypeEnum.Other)
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 {nameof(T)}的EntityType 不属于IoTDB数据模型实体属于异常情况-102");
}
if (metadata.EntityType == EntityTypeEnum.TreeModel && _runtimeContext.UseTableSessionPool == true)
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 tree模型不能使用table模型Session连接属于异常情况-103");
}
else if (metadata.EntityType == EntityTypeEnum.TableModel && _runtimeContext.UseTableSessionPool == false)
{
throw new ArgumentException($"{nameof(BuildTablet)} 构建存储结构{nameof(Tablet)}时 table模型不能使用tree模型Session连接属于异常情况-104");
}
string tableNameOrTreePath = string.Empty;
if (_runtimeContext.UseTableSessionPool)//表模型
{
//如果指定了路径
if (!string.IsNullOrWhiteSpace(metadata.TableNameOrTreePath))
{
tableNameOrTreePath = metadata.TableNameOrTreePath;
}
else
{
tableNameOrTreePath = DevicePathBuilder.GetTableName<T>();
}
return new List<Tablet>() { BuildTablet(entitiyList, metadata, tableNameOrTreePath) };
}
else
{
//树模型的时候实体的设备Id可能会不同因此需要根据不同路径进行存储。
var tabletList = new List<Tablet>();
var groupEntities = entitiyList.GroupBy(d => d.DevicePath).ToList();
foreach (var group in groupEntities)
{
tabletList.Add(BuildTablet(group.ToList(), metadata, group.Key));
}
return tabletList;
}
}
private Tablet BuildTablet<T>(List<T> entities, DeviceMetadata metadata, string tableNameOrTreePath) where T : IoTEntity
{
// 预分配内存结构
var rowCount = entities.Count;
var timestamps = new long[rowCount];
var values = new object[rowCount][];
for (var i = 0; i < values.Length; i++)
{
values[i] = new object[metadata.ColumnNames.Count];
}
List<string> tempColumnNames = new List<string>();
tempColumnNames.AddRange(metadata.ColumnNames);
// 顺序处理数据(保证线程安全)
for (var row = 0; row < rowCount; row++)
{
var entity = entities[row];
timestamps[row] = entity.Timestamps;
for (int i = 0; i < metadata.ColumnNames.Count; i++)
{
var processor = metadata.Processors[i];
if (processor.IsSingleMeasuring)
{
tempColumnNames[i] = (string)processor.SingleMeasuringNameGetter(entity);
}
// 获取并转换值
values[row][i] = processor.ValueGetter(entity);
}
}
// 后置校验与返回
ValidateDevicePaths(devicePaths);
// return CreateFinalTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps);
return _runtimeContext.UseTableSessionPool
? BuildTableSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps)
: BuildSessionTablet(metadata, devicePaths.First(), tempColumnNames, values, timestamps);
? BuildTableSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList())
: BuildSessionTablet(metadata, tableNameOrTreePath, tempColumnNames, values.Select(d => d.ToList()).ToList(), timestamps.ToList());
}
private void ValidateMetadataAndAccessor<T>(DeviceMetadata metadata, out ISourceEntityAccessor<T> accessor) where T : IoTEntity
{
accessor = SourceEntityAccessorFactory.GetAccessor<T>();
if (accessor.EntityType == null || metadata.EntityType == null)
{
throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时EntityType未指定", -101);
}
if (metadata.EntityType != accessor.EntityType)
{
throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时EntityType不一致", -102);
}
bool isTableModel = accessor.EntityType == EntityTypeEnum.TableModel;
if (_runtimeContext.UseTableSessionPool != isTableModel)
{
throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时Session类型不匹配: 预期{(isTableModel ? "Table" : "Tree")}模型", isTableModel ? -104 : -103);
}
}
/// <summary>
/// 处理实体并获取值
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entity"></param>
/// <param name="accessor"></param>
/// <param name="metadata"></param>
/// <param name="memberCache"></param>
/// <param name="tempColumnNames"></param>
/// <param name="timestamps"></param>
/// <param name="values"></param>
/// <exception cref="IoTException"></exception>
private void ProcessEntity<T>(
T entity,
ISourceEntityAccessor<T> accessor,
DeviceMetadata metadata,
Dictionary<string, EntityMemberInfo> memberCache,
List<string> tempColumnNames,
List<long> timestamps,
List<List<object>> values) where T : IoTEntity
{
timestamps.Add(entity.Timestamps);
var rowValues = new object[metadata.ColumnNames.Count];
Parallel.ForEach(metadata.ColumnNames, (measurement, state, index) =>
{
if (!memberCache.TryGetValue(measurement, out var member))
{
throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时找不到成员: {measurement}", -105);
}
object value = ResolveMemberValue(entity, member, memberCache, tempColumnNames, (int)index);
rowValues[index] = ConvertValueByType(member, value);
});
values.Add(rowValues.ToList());
}
private object ResolveMemberValue<T>(
T entity,
EntityMemberInfo member,
Dictionary<string, EntityMemberInfo> memberCache,
List<string> tempColumnNames,
int columnIndex) where T : IoTEntity
{
// 单测点逻辑
if (member.CustomAttributes?.OfType<SingleMeasuringAttribute>().FirstOrDefault() is { } attr)
{
var tuple1Key = $"{member.NameOrPath}.Item1";
var tuple2Key = $"{member.NameOrPath}.Item2";
if (!memberCache.TryGetValue(tuple1Key, out var tuple1) || !memberCache.TryGetValue(tuple2Key, out var tuple2))
{
throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时单侧点元组成员缺失", -106);
}
tempColumnNames[columnIndex] = (string)tuple1.GetValue(entity);
return tuple2.GetValue(entity);
}
return member.GetValue(entity);
}
/// <summary>
/// 设置实体的成员值
/// </summary>
/// <param name="member"></param>
/// <param name="value"></param>
/// <returns></returns>
private object ConvertValueByType(EntityMemberInfo member, object value)
{
return member.DeclaredTypeName switch
{
"DATETIME" => Convert.ToDateTime(value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
_ => value
};
}
/// <summary>
/// 处理设备路径
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entity"></param>
/// <param name="tableNameOrTreePath"></param>
/// <param name="devicePaths"></param>
private void UpdateDevicePaths<T>(
T entity,
string tableNameOrTreePath,
HashSet<string> devicePaths) where T : IoTEntity
{
if (!string.IsNullOrEmpty(tableNameOrTreePath))
{
devicePaths.Add(tableNameOrTreePath);
return;
}
var path = _runtimeContext.UseTableSessionPool
? DevicePathBuilder.GetTableName<T>()
: DevicePathBuilder.GetDevicePath(entity);
devicePaths.Add(path);
}
/// <summary>
/// 验证设备路径
/// </summary>
/// <param name="devicePaths"></param>
private void ValidateDevicePaths(HashSet<string> devicePaths)
{
if (devicePaths.Count == 0)
{
throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时设备路径集合为空", -108);
}
if (devicePaths.Count > 1)
{
var paths = string.Join(", ", devicePaths.Take(3));
{
throw new IoTException($"{nameof(BuildTablet)} 构建IoTDB数据结构时设备路径不一致。检测到路径: {paths}...", -109);
}
}
}
/// <summary>
/// 缓存优化:避免重复反射
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
private string GetTableNameOrTreePath<T>()
{
return AttributeCache<T>.TableNameOrTreePath;
}
/// <summary>
/// 特性缓存辅助类
/// </summary>
/// <typeparam name="T"></typeparam>
private static class AttributeCache<T>
{
public static readonly string TableNameOrTreePath;
static AttributeCache()
{
var attr = typeof(T).GetCustomAttribute<TableNameOrTreePathAttribute>();
TableNameOrTreePath = attr?.TableNameOrTreePath ?? string.Empty;
}
}
/// <summary>
/// 构建tree模型的Tablet
@ -615,7 +444,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <param name="values">数据集合</param>
/// <param name="timestamps">时间戳集合</param>
/// <returns></returns>
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List<string> columns,List<List<object>> values, List<long> timestamps)
private Tablet BuildTableSessionTablet(DeviceMetadata metadata, string tableName, List<string> columns, List<List<object>> values, List<long> timestamps)
{
var tablet = new Tablet(
tableName,
@ -825,15 +654,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
ColumnInfo? column = null;
if (tagAttr != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false);
column = new ColumnInfo(member.NameOrPath, ColumnCategory.TAG, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName);
}
else if (attrColumn != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false);
column = new ColumnInfo(member.NameOrPath, ColumnCategory.ATTRIBUTE, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName);
}
else if (fieldColumn != null)
{
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false);
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(declaredTypeName), false, member.DeclaredTypeName);
}
// 单测模式处理
@ -844,7 +673,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
throw new Exception($"{nameof(CollectColumnMetadata)} {accessor.EntityName} {member.NameOrPath} 单侧点属性解析异常");
}
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true);
column = new ColumnInfo(member.NameOrPath, ColumnCategory.FIELD, GetDataTypeFromTypeName(tupleMember.DeclaredTypeName), true, tupleMember.DeclaredTypeName);
}
if (column.HasValue) columns.Add(column.Value);
@ -858,7 +687,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <param name="typeInfo">待解析的类</param>
/// <param name="columns">已处理好的数据列</param>
/// <returns></returns>
private DeviceMetadata BuildDeviceMetadata<T>(List<ColumnInfo> columns) where T : IoTEntity
private DeviceMetadata BuildDeviceMetadata<T>(List<ColumnInfo> columns, ISourceEntityAccessor<T> accessor) where T : IoTEntity
{
var metadata = new DeviceMetadata();
@ -877,9 +706,97 @@ namespace JiShe.CollectBus.IoTDB.Provider
ProcessCategory(groupedColumns, ColumnCategory.ATTRIBUTE, metadata);
ProcessCategory(groupedColumns, ColumnCategory.FIELD, metadata);
// 新增处理器初始化
foreach (var item in metadata.ColumnNames)
{
ColumnInfo column = columns.FirstOrDefault(d => d.Name == item);
var processor = new ColumnProcessor
{
ColumnName = column.Name,
IsSingleMeasuring = column.IsSingleMeasuring,
Converter = CreateConverter(column.DeclaredTypeName.ToUpper())
};
// 处理单测点
if (column.IsSingleMeasuring)
{
var item1Member = accessor.MemberList
.First(m => m.NameOrPath == $"{column.Name}.Item1");
processor.SingleMeasuringNameGetter = (obj) =>
{
// 获取原始值并转为字符串
object rawValue = item1Member.Getter(obj);
string value = rawValue?.ToString();
if (!string.IsNullOrEmpty(value))
{
// 规则1: 严格检查ASCII字母和数字0-9, A-Z, a-z
bool hasInvalidChars = value.Any(c =>
!((c >= 'A' && c <= 'Z') ||
(c >= 'a' && c <= 'z') ||
(c >= '0' && c <= '9')));
// 规则2: 首字符不能是数字
bool startsWithDigit = value[0] >= '0' && value[0] <= '9';
// 规则3: 全字符串不能都是数字
bool allDigits = value.All(c => c >= '0' && c <= '9');
// 按优先级抛出具体异常
if (hasInvalidChars)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字");
}
else if (startsWithDigit)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能以数字开头");
}
else if (allDigits)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能全为数字");
}
}
return value;
};
var item2Member = accessor.MemberList
.First(m => m.NameOrPath == $"{column.Name}.Item2");
processor.ValueGetter = (obj) => {
object rawValue = item2Member.Getter(obj);
return processor.Converter(rawValue);
};
}
else
{
// 获取对应的成员访问器
var member = accessor.MemberList.First(m => m.NameOrPath == column.Name);
processor.ValueGetter = (obj) => {
object rawValue = member.Getter(obj);
return processor.Converter(rawValue);
};
}
metadata.Processors.Add(processor);
}
return metadata;
}
private Func<object, object> CreateConverter(string declaredTypeName)
{
return declaredTypeName switch
{
"DATETIME" => value => ((DateTime)value).GetDateTimeOffset().ToUnixTimeNanoseconds(),
_ => value => value
};
}
/// <summary>
/// 处理不同列类型的逻辑
/// </summary>
@ -906,6 +823,11 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// </summary>
public string Name { get; }
/// <summary>
/// 声明的类型的名称
/// </summary>
public string DeclaredTypeName { get; }
/// <summary>
/// 是否是单测点
/// </summary>
@ -921,12 +843,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// </summary>
public TSDataType DataType { get; }
public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring)
public ColumnInfo(string name, ColumnCategory category, TSDataType dataType, bool isSingleMeasuring, string declaredTypeName)
{
Name = name;
Category = category;
DataType = dataType;
IsSingleMeasuring = isSingleMeasuring;
DeclaredTypeName = declaredTypeName;
}
}
@ -1023,5 +946,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
return cache;
}
private static readonly Regex _asciiAlphanumericRegex = new Regex(@"^[a-zA-Z0-9]*$", RegexOptions.Compiled);
}
}

View File

@ -140,13 +140,13 @@ namespace JiShe.CollectBus.DataChannels
// 批量写入数据库
await _dbProvider.BatchInsertAsync(metadata, records);
//// 限流推送Kafka
//await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
// items: records,
// deviceIdSelector: data => data.DeviceId,
// processor: async (data, groupIndex) =>
// await KafkaProducerIssuedMessageAction(topicName, data, groupIndex)
//);
// 限流推送Kafka
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: records,
deviceIdSelector: data => data.DeviceId,
processor: async (data, groupIndex) =>
await KafkaProducerIssuedMessageAction(topicName, data, groupIndex)
);
}
catch (Exception ex)
{

View File

@ -184,14 +184,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
time = DateTime.Now;
//System.Reflection.PropertyInfo;
//System.Reflection.FieldInfo
var meter = new TreeModelSingleMeasuringEntity<string>()
var meter = new TreeModelSingleMeasuringEntity<DateTime>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "1",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = (measuring, value)
SingleMeasuring = (measuring, time)
};
await _iotDBProvider.InsertAsync(meter);
}

View File

@ -322,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
//return;
//return;
// 创建取消令牌源
//var cts = new CancellationTokenSource();
@ -1635,36 +1635,43 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType)
{
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
return new MeterReadingTelemetryPacketInfo()
try
{
SystemName = SystemType,
ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{ammeterInfo.MeterId}",
Timestamps = timestamps,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = creationTime,
MeterAddress = ammeterInfo.AmmerterAddress,
PacketType = (int)packetType,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
FocusId = ammeterInfo.FocusId,
FocusAddress = ammeterInfo.FocusAddress,
ItemCode = itemCode,
SubItemCode = subItemCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
return new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{ammeterInfo.MeterId}",
Timestamps = timestamps,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = creationTime,
MeterAddress = ammeterInfo.AmmerterAddress,
PacketType = (int)packetType,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
FocusId = ammeterInfo.FocusId,
FocusAddress = ammeterInfo.FocusAddress,
ItemCode = itemCode,
SubItemCode = subItemCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = Guid.NewGuid().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
}
catch (Exception ex)
{
throw ex;
}
}
#endregion