完善IoTDB存储路径约束

This commit is contained in:
ChenYi 2025-04-23 11:13:59 +08:00
parent 17bb8a5692
commit 42a446c24d
3 changed files with 25 additions and 23 deletions

View File

@ -11,29 +11,29 @@ namespace JiShe.CollectBus.IoTDB.Model
/// 系统名称 /// 系统名称
/// </summary> /// </summary>
[TAGColumn] [TAGColumn]
public string SystemName { get; set; } public string SystemName { get; set; }
/// <summary> /// <summary>
/// 项目编码 /// 项目编码
/// </summary> /// </summary>
[ATTRIBUTEColumn] [TAGColumn]
public string ProjectId { get; set; } public string ProjectId { get; set; }
/// <summary> /// <summary>
/// 设备类型集中器、电表、水表、流量计、传感器等 /// 设备类型集中器、电表、水表、流量计、传感器等
/// </summary> /// </summary>
[ATTRIBUTEColumn] [TAGColumn]
public string DeviceType { get; set; } public string DeviceType { get; set; }
/// <summary> /// <summary>
/// 设备ID,也就是通信设备的唯一标识符,例如集中器地址,或者其他传感器设备地址 /// 设备ID,数据生成者例如集中器ID,电表ID、水表ID、流量计ID、传感器ID等
/// </summary> /// </summary>
[TAGColumn] [TAGColumn]
public string DeviceId { get; set; } public string DeviceId { get; set; }
/// <summary> /// <summary>
/// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取
/// </summary> /// </summary>
public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
} }
} }

View File

@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public static string GetDevicePath<T>(T entity) where T : IoTEntity public static string GetDevicePath<T>(T entity) where T : IoTEntity
{ {
return $"root.{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
} }
@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public static string GetDeviceTableName<T>(T entity) where T : IoTEntity public static string GetDeviceTableName<T>(T entity) where T : IoTEntity
{ {
return $"{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; return $"{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
} }
} }

View File

@ -1,5 +1,6 @@
using Amazon.Runtime.Internal.Endpoints.StandardLibrary; using Amazon.Runtime.Internal.Endpoints.StandardLibrary;
using Confluent.Kafka; using Confluent.Kafka;
using DeviceDetectorNET.Parser.Device;
using DnsClient.Protocol; using DnsClient.Protocol;
using FreeSql; using FreeSql;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
@ -172,7 +173,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_ = CreateMeterPublishTask<WatermeterInfo>( _ = CreateMeterPublishTask<WatermeterInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: currentTaskTime, nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.WaterMeter,
taskCreateAction: (timeDensity, data, groupIndex, timestamps) => taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
{ {
var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
@ -280,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
var timer = Stopwatch.StartNew(); var timer = Stopwatch.StartNew();
List<string> focusAddressDataList = new List<string>();//用于处理Kafka主题分区数据的分发和处理。 List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组 //根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
@ -321,10 +322,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue; continue;
} }
focusAddressDataList.Add(item.Key);
foreach (var ammeter in item) foreach (var ammeter in item)
{ {
deviceIds.Add(ammeter.MeterId.ToString());
//处理ItemCode //处理ItemCode
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes)) if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
{ {
@ -378,14 +379,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
//初始化设备组负载控制 //初始化设备组负载控制
if (focusAddressDataList == null || focusAddressDataList.Count <= 0) if (deviceIds == null || deviceIds.Count <= 0)
{ {
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
} }
else else
{ {
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions); DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
} }
timer.Stop(); timer.Stop();
@ -661,7 +662,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
SystemName = SystemType, SystemName = SystemType,
ProjectId = $"{ammeterInfo.ProjectID}", ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}", DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{ammeterInfo.FocusAddress}", DeviceId = $"{ammeterInfo.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = ammeterInfo.DatabaseBusiID, DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = timestamps, PendingCopyReadTime = timestamps,
@ -723,7 +724,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空"); throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
} }
List<string> focusAddressDataList = new List<string>();//用于处理Kafka主题分区数据的分发和处理。 List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组 //根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
@ -761,7 +762,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue; continue;
} }
focusAddressDataList.Add(item.Key);
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
@ -769,6 +769,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var subItem in item) foreach (var subItem in item)
{ {
deviceIds.Add(subItem.MeterId.ToString());
watermeterInfo.Add(subItem); watermeterInfo.Add(subItem);
} }
@ -780,14 +782,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
//初始化设备组负载控制 //初始化设备组负载控制
if (focusAddressDataList == null || focusAddressDataList.Count <= 0) if (deviceIds == null || deviceIds.Count <= 0)
{ {
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
} }
else else
{ {
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions); DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
} }
@ -954,7 +956,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
SystemName = SystemType, SystemName = SystemType,
ProjectId = $"{watermeter.ProjectID}", ProjectId = $"{watermeter.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}", DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{watermeter.FocusAddress}", DeviceId = $"{watermeter.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = watermeter.DatabaseBusiID, DatabaseBusiID = watermeter.DatabaseBusiID,
PendingCopyReadTime = timestamps, PendingCopyReadTime = timestamps,