修复IoTDB查询映射类型异常的问题

This commit is contained in:
ChenYi 2025-05-12 15:14:19 +08:00
parent 80b8942d9d
commit eaf6f22bdc
6 changed files with 200 additions and 86 deletions

View File

@ -1,12 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IoTDB.Model
{
internal class Class1
{
}
}

View File

@ -60,14 +60,29 @@ namespace JiShe.CollectBus.IoTDB.Provider
public string ColumnName; public string ColumnName;
/// <summary> /// <summary>
/// 值获取委托 /// 数据类型
/// </summary>
public TSDataType TSDataType { get; set;}
/// <summary>
/// 值获取委托(参数:实体对象)
/// </summary> /// </summary>
public Func<object, object> ValueGetter; public Func<object, object> ValueGetter;
/// <summary>
/// 值设置委托(参数:实体对象,新值)
/// </summary>
public Action<object, object> ValueSetter;
/// <summary> /// <summary>
/// 类型转换委托 /// 类型转换委托
/// </summary> /// </summary>
public Func<object, object> Converter; public Func<object, object> GetConverter;
/// <summary>
/// 类型转换委托
/// </summary>
public Func<object, object> SetConverter;
/// <summary> /// <summary>
/// 是否单测点 /// 是否单测点

View File

@ -274,7 +274,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
_logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
var result = new BusPagedResult<T> var result = new BusPagedResult<T>
{ {
TotalCount = await GetTotalCount<T>(options), TotalCount = await GetTotalCount<T>(options),
@ -284,7 +283,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
}; };
stopwatch2.Stop(); stopwatch2.Stop();
_logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
//int totalPageCount = (int)Math.Ceiling((double)result.TotalCount / options.PageSize); //int totalPageCount = (int)Math.Ceiling((double)result.TotalCount / options.PageSize);
if (result.Items.Count() < result.PageSize) if (result.Items.Count() < result.PageSize)
@ -467,7 +466,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
private async Task<string> BuildQuerySQL<T>(IoTDBQueryOptions options) where T : IoTEntity private async Task<string> BuildQuerySQL<T>(IoTDBQueryOptions options) where T : IoTEntity
{ {
var metadata = await GetMetadata<T>(); var metadata = await GetMetadata<T>();
var sb = new StringBuilder("SELECT TIME as Timestamps,"); var sb = new StringBuilder("SELECT ");
sb.AppendJoin(", ", metadata.ColumnNames); sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTreePath}"); sb.Append($" FROM {options.TableNameOrTreePath}");
@ -578,11 +577,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
var accessor = SourceEntityAccessorFactory.GetAccessor<T>(); var accessor = SourceEntityAccessorFactory.GetAccessor<T>();
var memberCache = BuildMemberCache(accessor); var memberCache = BuildMemberCache(accessor);
var columns = new List<string>() { "Timestamps" };
var dataTypes = new List<TSDataType>() { TSDataType.TIMESTAMP };
columns.AddRange(metadata.ColumnNames);
dataTypes.AddRange(metadata.DataTypes);
while (dataSet.HasNext() && results.Count < pageSize) while (dataSet.HasNext() && results.Count < pageSize)
{ {
@ -592,28 +586,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
Timestamps = record.Timestamps Timestamps = record.Timestamps
}; };
foreach (var measurement in columns) for (int i = 0; i < metadata.Processors.Count; i++)
{ {
int indexOf = columns.IndexOf(measurement); var value = record.Values[i];
var value = record.Values[indexOf]; metadata.Processors[i].ValueSetter(entity, value);
TSDataType tSDataType = dataTypes[indexOf];
if (!memberCache.TryGetValue(measurement, out var member) && !(value is System.DBNull))
{
throw new Exception($"{nameof(ParseResults)} 解析查询结果 {accessor.EntityName} 属性赋值出现异常,没有找到{measurement}对应的 member信息");
}
dynamic tempValue = GetTSDataValue(tSDataType, value);
if (measurement.ToLower().EndsWith("time"))
{
member.Setter(entity, TimestampHelper.ConvertToDateTime(tempValue, TimestampUnit.Nanoseconds));
}
else
{
member.Setter(entity, tempValue);
}
} }
results.Add(entity); results.Add(entity);
@ -715,7 +691,9 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
ColumnName = column.Name, ColumnName = column.Name,
IsSingleMeasuring = column.IsSingleMeasuring, IsSingleMeasuring = column.IsSingleMeasuring,
Converter = CreateConverter(column.DeclaredTypeName.ToUpper()) GetConverter = GetterConverter(column.DeclaredTypeName.ToUpper()),
SetConverter = SetterConverter(column.Name.ToUpper()),
TSDataType = column.DataType,
}; };
// 处理单测点 // 处理单测点
@ -730,37 +708,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
object rawValue = item1Member.Getter(obj); object rawValue = item1Member.Getter(obj);
string value = rawValue?.ToString(); string value = rawValue?.ToString();
if (!string.IsNullOrWhiteSpace(value)) ValidateSingleMeasuringName(value);
{
// 规则1: 严格检查ASCII字母和数字0-9, A-Z, a-z
bool hasInvalidChars = value.Any(c =>
!((c >= 'A' && c <= 'Z') ||
(c >= 'a' && c <= 'z') ||
(c >= '0' && c <= '9')));
// 规则2: 首字符不能是数字
bool startsWithDigit = value[0] >= '0' && value[0] <= '9';
// 规则3: 全字符串不能都是数字
bool allDigits = value.All(c => c >= '0' && c <= '9');
// 按优先级抛出具体异常
if (hasInvalidChars)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字");
}
else if (startsWithDigit)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能以数字开头");
}
else if (allDigits)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能全为数字");
}
}
return value; return value;
}; };
@ -769,7 +717,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
.First(m => m.NameOrPath == $"{column.Name}.Item2"); .First(m => m.NameOrPath == $"{column.Name}.Item2");
processor.ValueGetter = (obj) => { processor.ValueGetter = (obj) => {
object rawValue = item2Member.Getter(obj); object rawValue = item2Member.Getter(obj);
return processor.Converter(rawValue); return processor.GetConverter(rawValue);
}; };
} }
else else
@ -778,7 +726,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
var member = accessor.MemberList.First(m => m.NameOrPath == column.Name); var member = accessor.MemberList.First(m => m.NameOrPath == column.Name);
processor.ValueGetter = (obj) => { processor.ValueGetter = (obj) => {
object rawValue = member.Getter(obj); object rawValue = member.Getter(obj);
return processor.Converter(rawValue); return processor.GetConverter(rawValue);
};
//对应的属性成员进行赋值
processor.ValueSetter = (obj, value) =>
{
dynamic tempValue = GetTSDataValue(processor.TSDataType, value);
var rawValue = processor.SetConverter(value);
member.Setter(obj, rawValue);
}; };
} }
@ -788,7 +744,52 @@ namespace JiShe.CollectBus.IoTDB.Provider
return metadata; return metadata;
} }
private Func<object, object> CreateConverter(string declaredTypeName) /// <summary>
/// 验证单测点名称格式
/// </summary>
private void ValidateSingleMeasuringName(string value)
{
if (string.IsNullOrWhiteSpace(value))
{
return;
}
// 规则1: 严格检查ASCII字母和数字0-9, A-Z, a-z
bool hasInvalidChars = value.Any(c =>
!((c >= 'A' && c <= 'Z') ||
(c >= 'a' && c <= 'z') ||
(c >= '0' && c <= '9')));
// 规则2: 首字符不能是数字
bool startsWithDigit = value[0] >= '0' && value[0] <= '9';
// 规则3: 全字符串不能都是数字
bool allDigits = value.All(c => c >= '0' && c <= '9');
// 按优先级抛出具体异常
if (hasInvalidChars)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 包含非法字符,只允许字母和数字");
}
else if (startsWithDigit)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能以数字开头");
}
else if (allDigits)
{
throw new InvalidOperationException(
$"SingleMeasuring name '{value}' 不能全为数字");
}
}
/// <summary>
/// 取值的处理器
/// </summary>
/// <param name="declaredTypeName"></param>
/// <returns></returns>
private Func<object, object> GetterConverter(string declaredTypeName)
{ {
return declaredTypeName switch return declaredTypeName switch
{ {
@ -797,6 +798,16 @@ namespace JiShe.CollectBus.IoTDB.Provider
}; };
} }
/// <summary>
/// 设置值的处理
/// </summary>
/// <param name="columnName"></param>
/// <returns></returns>
private Func<object, object> SetterConverter(string columnName) =>
columnName.ToLower().EndsWith("time")
? value => new DateTime(Convert.ToInt64(value), DateTimeKind.Utc)
: value => value;
/// <summary> /// <summary>
/// 处理不同列类型的逻辑 /// 处理不同列类型的逻辑
/// </summary> /// </summary>

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Ammeters; using FreeSql.Internal.CommonProvider;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.DeviceBalanceControl;
@ -10,7 +11,10 @@ using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.PrepayModel; using JiShe.CollectBus.IotSystems.PrepayModel;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
@ -27,6 +31,7 @@ using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using TouchSocket.Core; using TouchSocket.Core;
using TouchSocket.Sockets; using TouchSocket.Sockets;
using static IdentityModel.ClaimComparer;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;
@ -205,18 +210,63 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
public async Task TestTreeModelSingleMeasuringEntity2(string measuring, int value, DateTime time) public async Task TestTreeModelSingleMeasuringEntity2(string measuring, int value, DateTime time)
{ {
time = DateTime.Now; time = DateTime.Now;
var meter = new TreeModelSingleMeasuringEntity<int>() var meter = new TreeModelSingleMeasuringEntity<bool>()
{ {
SystemName = "energy", SystemName = "energy",
DeviceId = "402440506", DeviceId = "402440506",
DeviceType = "Ammeter", DeviceType = "Ammeter",
ProjectId = "10059", ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = (measuring, value) SingleMeasuring = (measuring, true)
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
/// <summary>
/// 测试树模型单个测点数据项查询
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestTreeModelSingleMeasuringEntityQuery()
{
var time = DateTime.Now;
var meter = new TreeModelSingleMeasuringEntity<bool>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = ("measuring", true)
};
QueryCondition conditions = new QueryCondition()
{
Field = "DeviceId",
Operator = "=",
Value = meter.DeviceId
};
var query = new IoTDBQueryOptions()
{
TableNameOrTreePath = meter.DevicePath,
PageIndex = 1,
PageSize = 1,
Conditions = new List<QueryCondition>() { conditions },
};
var pageResult = await _iotDBProvider.QueryAsync<DeviceTreeModelDataInfo>(query);
await _iotDBProvider.InsertAsync(meter);
}
/// <summary> /// <summary>
/// 测试表模型单个测点数据项 /// 测试表模型单个测点数据项
/// </summary> /// </summary>
@ -250,14 +300,14 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
{ {
time = DateTime.Now; time = DateTime.Now;
var meter = new TableModelSingleMeasuringEntity<bool>() var meter = new TableModelSingleMeasuringEntity<int>()
{ {
SystemName = "energy", SystemName = "energy",
DeviceId = "402440506", DeviceId = "402440506",
DeviceType = "Ammeter", DeviceType = "Ammeter",
ProjectId = "10059", ProjectId = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(), Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleColumn = (measuring, true) SingleColumn = (measuring, value)
}; };
_dbContext.UseTableSessionPool = true; _dbContext.UseTableSessionPool = true;
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
@ -462,5 +512,33 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
// 测试不管是否上线都ACK // 测试不管是否上线都ACK
return SubscribeAck.Success(); return SubscribeAck.Success();
} }
/// <summary>
/// 测试Redis批量读取10万条数据性能
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task TestRedisCacheGetData(string scores)
{
var timeDensity = "15";
string SystemType = "Energy";
string ServerTagName = "JiSheCollectBus5";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var page = await _redisDataCacheService.GetSingleData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
"973219481:17",
pageSize: 1000,
lastScore: 100,
lastMember: "memberId",
descending: true
);
await Task.CompletedTask;
}
} }

View File

@ -322,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
//return; return;
// 创建取消令牌源 // 创建取消令牌源
//var cts = new CancellationTokenSource(); //var cts = new CancellationTokenSource();

View File

@ -0,0 +1,22 @@
using JiShe.CollectBus.Analyzers.Shared;
using JiShe.CollectBus.IoTDB.Attributes;
using JiShe.CollectBus.IoTDB.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IotSystems.Devices
{
/// <summary>
/// 设备树模型数据信息
/// </summary>
[SourceAnalyzers(EntityTypeEnum.TableModel)]
public class DeviceTreeModelDataInfo: IoTEntity
{
[FIELDColumn]
public bool xfdsa { get; set; }
}
}