优化IoTDB驱动

This commit is contained in:
ChenYi 2025-04-21 14:20:49 +08:00
parent 3c6a04f947
commit ca1a23fd33
13 changed files with 214 additions and 188 deletions

View File

@ -1,6 +1,6 @@
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
namespace JiShe.CollectBus.IoTDB.Interface namespace JiShe.CollectBus.IoTDB.Interface
{ {

View File

@ -6,7 +6,6 @@
<Nullable>enable</Nullable> <Nullable>enable</Nullable>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<!--<PackageReference Include="Apache.IoTDB" Version="1.3.3.1" />-->
<PackageReference Include="Apache.IoTDB" Version="2.0.2" /> <PackageReference Include="Apache.IoTDB" Version="2.0.2" />
<PackageReference Include="Volo.Abp" Version="8.3.3" /> <PackageReference Include="Volo.Abp" Version="8.3.3" />
</ItemGroup> </ItemGroup>

View File

@ -1,9 +1,9 @@
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attribute;
namespace JiShe.CollectBus.IoTDB.Provider namespace JiShe.CollectBus.IoTDB.Model
{ {
/// <summary> /// <summary>
/// IoT实体基类 /// IoT实体基类,此类适用于多个数据测点记录场景,单个测点请使用子类 SingleMeasuringEntity
/// </summary> /// </summary>
public abstract class IoTEntity public abstract class IoTEntity
{ {
@ -32,7 +32,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
public required string DeviceId { get; set; } public required string DeviceId { get; set; }
/// <summary> /// <summary>
/// 当前时间戳,单位毫秒 /// 当前时间戳,单位毫秒,必须通过DateTimeOffset获取
/// </summary> /// </summary>
public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); public required long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
} }

View File

@ -4,19 +4,21 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Enums;
using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IoTDB.Provider;
namespace JiShe.CollectBus.IotSystems.AFNEntity namespace JiShe.CollectBus.IoTDB.Model
{ {
/// <summary> /// <summary>
/// AFN单项数据实体 /// 单项数据实体
/// </summary> /// </summary>
public class SingleMeasuringAFNDataEntity<T> : IoTEntity [EntityType(EntityTypeEnum.TreeModel)]
public class SingleMeasuringEntity<T> : IoTEntity
{ {
/// <summary> /// <summary>
/// 单项数据对象 /// 单项数据对象
/// </summary> /// </summary>
[SingleMeasuring(nameof(SingleMeasuring))] [SingleMeasuring(nameof(SingleMeasuring))]
public Tuple<string, T> SingleMeasuring { get; set; } public required Tuple<string, T> SingleMeasuring { get; set; }
} }
} }

View File

@ -1,4 +1,6 @@
namespace JiShe.CollectBus.IoTDB.Provider using JiShe.CollectBus.IoTDB.Model;
namespace JiShe.CollectBus.IoTDB.Provider
{ {
/// <summary> /// <summary>
/// 设备路径构建器 /// 设备路径构建器

View File

@ -8,8 +8,10 @@ using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Entities;
namespace JiShe.CollectBus.IoTDB.Provider namespace JiShe.CollectBus.IoTDB.Provider
@ -57,12 +59,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
var tablet = BuildTablet(new[] { entity }, metadata); var tablet = BuildTablet(new[] { entity }, metadata);
await CurrentSession.InsertAsync(tablet); await CurrentSession.InsertAsync(tablet);
//int result = await _currentSession.InsertAsync(tablet);
//if (result <= 0)
//{
// _logger.LogError($"{typeof(T).Name}插入数据没有成功");
//}
} }
/// <summary> /// <summary>
@ -81,11 +77,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
var tablet = BuildTablet(batch, metadata); var tablet = BuildTablet(batch, metadata);
await CurrentSession.InsertAsync(tablet); await CurrentSession.InsertAsync(tablet);
//var result = await _currentSession.InsertAsync(tablet);
//if (result <= 0)
//{
// _logger.LogWarning($"{typeof(T).Name} 批量插入数据第{batch}批次没有成功,共{batches}批次。");
//}
} }
} }
@ -212,18 +203,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
{ {
rowValues.Add(value); rowValues.Add(value);
//if (value != null)
//{
// rowValues.Add(value);
//}
//else
//{
// ////填充默认数据值
// //DataTypeDefaultValueMap.TryGetValue(propertyInfo.PropertyType.Name, out object defaultValue);
// rowValues.Add(null);
//}
} }
} }
@ -434,23 +413,9 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
private DeviceMetadata GetMetadata<T>() where T : IoTEntity private DeviceMetadata GetMetadata<T>() where T : IoTEntity
{ {
if (_runtimeContext.UseTableSessionPool)//表模型
{
return _metadataCache.GetOrAdd(typeof(T), type =>
{
var columns = CollectColumnMetadata(type);
var metadata = BuildDeviceMetadata<T>(columns);
return metadata;
});
}
else
{
// 树模型
var columns = CollectColumnMetadata(typeof(T)); var columns = CollectColumnMetadata(typeof(T));
var metadata = BuildDeviceMetadata<T>(columns); var metadata = BuildDeviceMetadata<T>(columns);
return MetadataCache.AddOrUpdate(
return _metadataCache.AddOrUpdate(
typeof(T), typeof(T),
addValueFactory: t => metadata, // 如果键不存在,用此值添加 addValueFactory: t => metadata, // 如果键不存在,用此值添加
updateValueFactory: (t, existingValue) => updateValueFactory: (t, existingValue) =>
@ -464,7 +429,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
); );
} }
}
/// <summary> /// <summary>
/// 获取设备元数据的列 /// 获取设备元数据的列
@ -477,21 +441,36 @@ namespace JiShe.CollectBus.IoTDB.Provider
foreach (var prop in type.GetProperties()) foreach (var prop in type.GetProperties())
{ {
string typeName = string.Empty;
Type declaredType = prop.PropertyType;
// 处理可空类型
if (declaredType.IsGenericType && declaredType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
Type underlyingType = Nullable.GetUnderlyingType(declaredType);
typeName = underlyingType.Name;
}
else
{
typeName = declaredType.Name;
}
//先获取Tag标签和属性标签 //先获取Tag标签和属性标签
ColumnInfo? column = prop.GetCustomAttribute<TAGColumnAttribute>() is not null ? new ColumnInfo( ColumnInfo? column = prop.GetCustomAttribute<TAGColumnAttribute>() is not null ? new ColumnInfo(
name: prop.Name, name: prop.Name,
category: ColumnCategory.TAG, category: ColumnCategory.TAG,
dataType: GetDataTypeFromTypeName(prop.PropertyType.Name), dataType: GetDataTypeFromTypeName(typeName),
false false
) : prop.GetCustomAttribute<ATTRIBUTEColumnAttribute>() is not null ? new ColumnInfo( ) : prop.GetCustomAttribute<ATTRIBUTEColumnAttribute>() is not null ? new ColumnInfo(
prop.Name, prop.Name,
ColumnCategory.ATTRIBUTE, ColumnCategory.ATTRIBUTE,
GetDataTypeFromTypeName(prop.PropertyType.Name), GetDataTypeFromTypeName(typeName),
false false
) : prop.GetCustomAttribute<FIELDColumnAttribute>() is not null ? new ColumnInfo( ) : prop.GetCustomAttribute<FIELDColumnAttribute>() is not null ? new ColumnInfo(
prop.Name, prop.Name,
ColumnCategory.FIELD, ColumnCategory.FIELD,
GetDataTypeFromTypeName(prop.PropertyType.Name), GetDataTypeFromTypeName(typeName),
false) false)
: null; : null;

View File

@ -56,7 +56,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
var result = await _sessionPool.InsertAlignedTabletAsync(tablet); var result = await _sessionPool.InsertAlignedTabletAsync(tablet);
if (result != 0) if (result != 0)
{ {
throw new Exception($"{nameof(TableSessionPoolAdapter)} "); throw new Exception($"{nameof(SessionPoolAdapter)} Tree模型数据入库没有成功返回结果为{result}");
} }
return result; return result;

View File

@ -54,7 +54,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
var result = await _sessionPool.InsertAsync(tablet); var result = await _sessionPool.InsertAsync(tablet);
if (result != 0) if (result != 0)
{ {
throw new Exception($"{nameof(TableSessionPoolAdapter)} "); throw new Exception($"{nameof(TableSessionPoolAdapter)} table模型数据入库没有成功返回结果为{result}");
} }
return result; return result;

View File

@ -1,33 +1,27 @@
using System; using JiShe.CollectBus.Ammeters;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.IoTDB.DataStructure;
using Apache.IoTDB;
using Confluent.Kafka;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.IotSystems.PrepayModel;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.IotSystems.AFNEntity;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using System.Diagnostics.Metrics;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Kafka.Attributes;
using System.Text.Json;
using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Consts;
using System.Diagnostics; using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IotSystems.PrepayModel;
using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;
@ -52,21 +46,11 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
/// <summary> /// <summary>
/// 测试 UseSessionPool /// 测试 UseSessionPool
/// </summary> /// </summary>
/// <param name="timestamps"></param> /// <param name="testTime"></param>
/// <returns></returns> /// <returns></returns>
[HttpGet] [HttpGet]
public async Task UseSessionPool(long timestamps) public async Task UseSessionPool(DateTime testTime)
{ {
string? messageHexString = null;
if (timestamps == 0)
{
timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
_logger.LogError($"timestamps_{timestamps}");
}
else
{
messageHexString = messageHexString + timestamps;
}
ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel() ElectricityMeterTreeModel meter = new ElectricityMeterTreeModel()
{ {
@ -77,8 +61,8 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
MeterModel = "DDZY-1980", MeterModel = "DDZY-1980",
ProjectCode = "10059", ProjectCode = "10059",
Voltage = 10, Voltage = 10,
IssuedMessageHexString = messageHexString, IssuedMessageHexString = "messageHexString",
Timestamps = timestamps, Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
@ -88,10 +72,9 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
[HttpGet] [HttpGet]
public async Task UseTableSessionPool() public async Task UseTableSessionPool(DateTime time)
{ {
var testTime = Convert.ToDateTime("2025-04-21 08:35:55"); var testTime = time;
ElectricityMeterTreeModel meter2 = new ElectricityMeterTreeModel() ElectricityMeterTreeModel meter2 = new ElectricityMeterTreeModel()
{ {
SystemName = "energy", SystemName = "energy",
@ -101,7 +84,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
MeterModel = "DDZY-1980", MeterModel = "DDZY-1980",
ProjectCode = "10059", ProjectCode = "10059",
Voltage = 10, Voltage = 10,
Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
}; };
await _iotDBProvider.InsertAsync(meter2); await _iotDBProvider.InsertAsync(meter2);
@ -117,10 +100,39 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
MeterModel = "DDZY-1980", MeterModel = "DDZY-1980",
ProjectCode = "10059", ProjectCode = "10059",
Voltage = 10, Voltage = 10,
Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
}
/// <summary>
/// 测试Session切换3
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task UseTableSessionPool3(DateTime time)
{
var testTime = time;
ElectricityMeterTreeModel meter2 = new ElectricityMeterTreeModel()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
Current = 10,
MeterModel = "DDZY-1980",
ProjectCode = "10059",
Voltage = 10,
IssuedMessageHexString = "dsdfsfd",
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
};
await _iotDBProvider.InsertAsync(meter2);
_dbContext.UseTableSessionPool = true;
ElectricityMeter meter3 = new ElectricityMeter() ElectricityMeter meter3 = new ElectricityMeter()
{ {
SystemName = "energy", SystemName = "energy",
@ -131,7 +143,48 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
ProjectCode = "10059", ProjectCode = "10059",
Voltage = 10, Voltage = 10,
Currentd = 22, Currentd = 22,
Timestamps = new DateTimeOffset(testTime).ToUnixTimeMilliseconds(), IssuedMessageHexString = "dsdfsfd",
Timestamps = testTime.GetDateTimeOffset().ToUnixTimeMilliseconds(),
};
await _iotDBProvider.InsertAsync(meter3);
}
/// <summary>
/// 测试单个测点数据项
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestSingleMeasuringAFNData(string measuring, string value, DateTime time)
{
var meter = new SingleMeasuringEntity<string>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectCode = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = new Tuple<string, string>(measuring, value)
};
await _iotDBProvider.InsertAsync(meter);
}
/// <summary>
/// 测试单个测点数据项2
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestSingleMeasuringAFNData2(string measuring, int value, DateTime time)
{
var meter = new SingleMeasuringEntity<int>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectCode = "10059",
Timestamps = time.GetDateTimeOffset().ToUnixTimeMilliseconds(),
SingleMeasuring = new Tuple<string, int>(measuring, value)
}; };
await _iotDBProvider.InsertAsync(meter); await _iotDBProvider.InsertAsync(meter);
} }
@ -188,26 +241,6 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
} }
/// <summary>
/// 测试单个测点数据项
/// </summary>
/// <param name="measuring"></param>
/// <returns></returns>
[HttpGet]
public async Task TestSingleMeasuringAFNData(string measuring, string value)
{
var meter = new SingleMeasuringAFNDataEntity<string>()
{
SystemName = "energy",
DeviceId = "402440506",
DeviceType = "Ammeter",
ProjectCode = "10059",
Timestamps = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
SingleMeasuring = new Tuple<string, string>(measuring, value)
};
await _iotDBProvider.InsertAsync(meter);
}
/// <summary> /// <summary>
/// 测试Redis批量读取10万条数据性能 /// 测试Redis批量读取10万条数据性能
/// </summary> /// </summary>

View File

@ -12,7 +12,6 @@ using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IotSystems.AFNEntity;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Cassandra; using JiShe.CollectBus.Cassandra;

View File

@ -5,7 +5,7 @@ using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using JiShe.CollectBus.IoTDB.Attribute; using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Enums; using JiShe.CollectBus.IoTDB.Enums;
using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IoTDB.Model;
namespace JiShe.CollectBus.Ammeters namespace JiShe.CollectBus.Ammeters
{ {

View File

@ -1,11 +1,6 @@
using System; using JiShe.CollectBus.IoTDB.Attribute;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using JiShe.CollectBus.IoTDB.Attribute;
using JiShe.CollectBus.IoTDB.Enums; using JiShe.CollectBus.IoTDB.Enums;
using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IoTDB.Model;
namespace JiShe.CollectBus.Ammeters namespace JiShe.CollectBus.Ammeters
{ {

View File

@ -233,5 +233,22 @@ namespace JiShe.CollectBus.Common.Extensions
.AddHours(hours) .AddHours(hours)
.AddMinutes(minutes); .AddMinutes(minutes);
} }
/// <summary>
/// 将 DateTime 时间转换为 DateTimeOffset 时间
/// </summary>
/// <param name="rawDateTime"></param>
/// <returns></returns>
public static DateTimeOffset GetDateTimeOffset(this DateTime rawDateTime)
{
//确保 Kind 为 Local如果是 Unspecified
DateTime localDateTime = rawDateTime.Kind == DateTimeKind.Unspecified
? DateTime.SpecifyKind(rawDateTime, DateTimeKind.Local)
: rawDateTime;
// 转换为 DateTimeOffset自动应用本地时区偏移
return new DateTimeOffset(localDateTime);
}
} }
} }