diff --git a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs index 92f98e1..a079bd4 100644 --- a/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs +++ b/modules/JiShe.CollectBus.IoTDB/Model/IoTEntity.cs @@ -11,29 +11,29 @@ namespace JiShe.CollectBus.IoTDB.Model /// 系统名称 /// [TAGColumn] - public string SystemName { get; set; } + public string SystemName { get; set; } /// /// 项目编码 /// - [ATTRIBUTEColumn] - public string ProjectId { get; set; } + [TAGColumn] + public string ProjectId { get; set; } /// /// 设备类型集中器、电表、水表、流量计、传感器等 /// - [ATTRIBUTEColumn] - public string DeviceType { get; set; } + [TAGColumn] + public string DeviceType { get; set; } /// - /// 设备ID,也就是通信设备的唯一标识符,例如集中器地址,或者其他传感器设备地址 + /// 设备ID,数据生成者,例如集中器ID,电表ID、水表ID、流量计ID、传感器ID等 /// [TAGColumn] - public string DeviceId { get; set; } + public string DeviceId { get; set; } /// /// 时标,也就是业务时间戳,单位毫秒,必须通过DateTimeOffset获取 /// - public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + public long Timestamps { get; set; } = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs index 6922c62..6a1a596 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/DevicePathBuilder.cs @@ -15,7 +15,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDevicePath(T entity) where T : IoTEntity { - return $"root.{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; + return $"root.{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } @@ -39,7 +39,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public static string GetDeviceTableName(T entity) where T : IoTEntity { - return $"{entity.SystemName.ToLower()}.`{entity.DeviceId}`"; + return $"{entity.SystemName.ToLower()}.`{entity.ProjectId}`.`{entity.DeviceType}`.`{entity.DeviceId}`"; } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 095f01b..49770ef 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,5 +1,6 @@ using Amazon.Runtime.Internal.Endpoints.StandardLibrary; using Confluent.Kafka; +using DeviceDetectorNET.Parser.Device; using DnsClient.Protocol; using FreeSql; using JiShe.CollectBus.Ammeters; @@ -172,7 +173,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTaskTime, - meterType: MeterTypeEnum.Ammeter, + meterType: MeterTypeEnum.WaterMeter, taskCreateAction: (timeDensity, data, groupIndex, timestamps) => { var tempTask = WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); @@ -280,7 +281,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } var timer = Stopwatch.StartNew(); - List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。 + List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。 //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); @@ -320,11 +321,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading { continue; } - - focusAddressDataList.Add(item.Key); - + foreach (var ammeter in item) { + deviceIds.Add(ammeter.MeterId.ToString()); + //处理ItemCode if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes)) { @@ -378,14 +379,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //初始化设备组负载控制 - if (focusAddressDataList == null || focusAddressDataList.Count <= 0) + if (deviceIds == null || deviceIds.Count <= 0) { _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); } else { - DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions); + DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions); } timer.Stop(); @@ -661,7 +662,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading SystemName = SystemType, ProjectId = $"{ammeterInfo.ProjectID}", DeviceType = $"{MeterTypeEnum.Ammeter}", - DeviceId = $"{ammeterInfo.FocusAddress}", + DeviceId = $"{ammeterInfo.MeterId}", Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), DatabaseBusiID = ammeterInfo.DatabaseBusiID, PendingCopyReadTime = timestamps, @@ -723,7 +724,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空"); } - List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。 + List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。 //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); @@ -761,7 +762,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - focusAddressDataList.Add(item.Key); var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; @@ -769,6 +769,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading foreach (var subItem in item) { + deviceIds.Add(subItem.MeterId.ToString()); + watermeterInfo.Add(subItem); } @@ -780,14 +782,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //初始化设备组负载控制 - if (focusAddressDataList == null || focusAddressDataList.Count <= 0) + if (deviceIds == null || deviceIds.Count <= 0) { _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); } else { - DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions); + DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions); } @@ -954,7 +956,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading SystemName = SystemType, ProjectId = $"{watermeter.ProjectID}", DeviceType = $"{MeterTypeEnum.Ammeter}", - DeviceId = $"{watermeter.FocusAddress}", + DeviceId = $"{watermeter.MeterId}", Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), DatabaseBusiID = watermeter.DatabaseBusiID, PendingCopyReadTime = timestamps,