From 42a446c24d8931aa9755b51342f15e803a3b731e Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Wed, 23 Apr 2025 11:13:59 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84IoTDB=E5=AD=98=E5=82=A8?=
=?UTF-8?q?=E8=B7=AF=E5=BE=84=E7=BA=A6=E6=9D=9F?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../JiShe.CollectBus.IoTDB/Model/IoTEntity.cs | 16 +++++------
.../Provider/DevicePathBuilder.cs | 4 +--
.../BasicScheduledMeterReadingService.cs | 28 ++++++++++---------
3 files changed, 25 insertions(+), 23 deletions(-)
diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs
index 92f98e1..a079bd4 100644
--- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs
@@ -11,29 +11,29 @@ namespace JiShe.CollectBus.IoTDB.Model
/// 系统名称
///
[TAGColumn]
- public string SystemName { get; set; }
+ public string SystemName { get; set; }
///
/// 项目编码
///
- [ATTRIBUTEColumn]
- public string ProjectId { get; set; }
+ [TAGColumn]
+ public string ProjectId { get; set; }
///
/// 设备类型集中器、电表、水表、流量计、传感器等
///
- [ATTRIBUTEColumn]
- public string DeviceType { get; set; }
+ [TAGColumn]
+ public string DeviceType { get; set; }
///
- /// 设备ID,也就是通信设备的唯一标识符,例如集中器地址,或者其他传感器设备地址
+ /// 设备ID,数据生成者,例如集中器ID,电表ID、水表ID、流量计ID、传感器ID等
///
[TAGColumn]
- public string DeviceId { get; set; }
+ public string DeviceId { get; set; }
///
/// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取
///
- public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
}
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs
index 6922c62..6a1a596 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs
@@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
public static string GetDevicePath(T entity) where T : IoTEntity
{
- return $"root.{entity.SystemName.ToLower()}.`{entity.DeviceId}`";
+ return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
}
@@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
public static string GetDeviceTableName(T entity) where T : IoTEntity
{
- return $"{entity.SystemName.ToLower()}.`{entity.DeviceId}`";
+ return $"{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`";
}
}
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 095f01b..49770ef 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -1,5 +1,6 @@
using Amazon.Runtime.Internal.Endpoints.StandardLibrary;
using Confluent.Kafka;
+using DeviceDetectorNET.Parser.Device;
using DnsClient.Protocol;
using FreeSql;
using JiShe.CollectBus.Ammeters;
@@ -172,7 +173,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_ = CreateMeterPublishTask(
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
- meterType: MeterTypeEnum.Ammeter,
+ meterType: MeterTypeEnum.WaterMeter,
taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
@@ -280,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
var timer = Stopwatch.StartNew();
- List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。
+ List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
@@ -320,11 +321,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
continue;
}
-
- focusAddressDataList.Add(item.Key);
-
+
foreach (var ammeter in item)
{
+ deviceIds.Add(ammeter.MeterId.ToString());
+
//处理ItemCode
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
{
@@ -378,14 +379,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//初始化设备组负载控制
- if (focusAddressDataList == null || focusAddressDataList.Count <= 0)
+ if (deviceIds == null || deviceIds.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
- DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions);
+ DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
}
timer.Stop();
@@ -661,7 +662,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
SystemName = SystemType,
ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
- DeviceId = $"{ammeterInfo.FocusAddress}",
+ DeviceId = $"{ammeterInfo.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = timestamps,
@@ -723,7 +724,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
}
- List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。
+ List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
@@ -761,7 +762,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
- focusAddressDataList.Add(item.Key);
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
@@ -769,6 +769,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var subItem in item)
{
+ deviceIds.Add(subItem.MeterId.ToString());
+
watermeterInfo.Add(subItem);
}
@@ -780,14 +782,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//初始化设备组负载控制
- if (focusAddressDataList == null || focusAddressDataList.Count <= 0)
+ if (deviceIds == null || deviceIds.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
- DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions);
+ DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
}
@@ -954,7 +956,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
SystemName = SystemType,
ProjectId = $"{watermeter.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
- DeviceId = $"{watermeter.FocusAddress}",
+ DeviceId = $"{watermeter.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = watermeter.DatabaseBusiID,
PendingCopyReadTime = timestamps,