2025-04-23 16:17:29 +08:00
using JiShe.CollectBus.Application.Contracts ;
2025-04-25 09:28:20 +08:00
using JiShe.CollectBus.Common ;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Common.Consts ;
2025-04-14 21:56:24 +08:00
using JiShe.CollectBus.Common.DeviceBalanceControl ;
2025-04-22 22:11:55 +08:00
using JiShe.CollectBus.Common.Encrypt ;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Common.Enums ;
2025-03-14 17:28:58 +08:00
using JiShe.CollectBus.Common.Extensions ;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Common.Helpers ;
2025-04-16 17:36:46 +08:00
using JiShe.CollectBus.Common.Models ;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.GatherItem ;
2025-04-21 22:57:49 +08:00
using JiShe.CollectBus.IoTDB.Context ;
2025-04-18 11:31:23 +08:00
using JiShe.CollectBus.IoTDB.Interface ;
2025-04-22 16:44:47 +08:00
using JiShe.CollectBus.IoTDB.Model ;
using JiShe.CollectBus.IoTDB.Options ;
using JiShe.CollectBus.IoTDB.Provider ;
2025-04-23 16:17:29 +08:00
using JiShe.CollectBus.IotSystems.Ammeters ;
2025-03-18 22:43:24 +08:00
using JiShe.CollectBus.IotSystems.MeterReadingRecords ;
2025-03-14 14:38:08 +08:00
using JiShe.CollectBus.IotSystems.Watermeter ;
2025-04-19 14:00:13 +08:00
using JiShe.CollectBus.Kafka.Internal ;
2025-04-15 15:49:51 +08:00
using JiShe.CollectBus.Kafka.Producer ;
2025-04-25 14:23:06 +08:00
using JiShe.CollectBus.Protocol.Interfaces ;
2025-03-14 14:24:38 +08:00
using Microsoft.Extensions.Logging ;
2025-04-17 13:54:18 +08:00
using Microsoft.Extensions.Options ;
2025-04-09 23:11:36 +08:00
using System ;
using System.Collections.Generic ;
using System.Diagnostics ;
using System.Linq ;
using System.Threading.Tasks ;
2025-04-27 09:16:37 +08:00
using JiShe.CollectBus.Protocol.Models ;
2025-04-29 23:48:47 +08:00
using System.Threading.Channels ;
2025-04-30 12:36:54 +08:00
using static IdentityModel . ClaimComparer ;
using JiShe.CollectBus.DataChannels ;
using JiShe.CollectBus.DataMigration.Options ;
using static System . Runtime . InteropServices . JavaScript . JSType ;
2025-04-30 15:57:14 +08:00
using static System . Formats . Asn1 . AsnWriter ;
2025-05-06 14:33:49 +08:00
using System.Threading ;
2025-03-14 14:24:38 +08:00
2025-03-14 14:38:08 +08:00
namespace JiShe.CollectBus.ScheduledMeterReading
2025-03-14 14:24:38 +08:00
{
/// <summary>
/// 定时采集服务
/// </summary>
public abstract class BasicScheduledMeterReadingService : CollectBusAppService , IScheduledMeterReadingService
{
private readonly ILogger < BasicScheduledMeterReadingService > _logger ;
2025-04-21 10:17:40 +08:00
private readonly IIoTDbProvider _dbProvider ;
2025-04-30 12:36:54 +08:00
private readonly IDataChannelManageService _dataChannelManage ;
2025-04-16 17:36:46 +08:00
private readonly IRedisDataCacheService _redisDataCacheService ;
2025-04-24 17:48:20 +08:00
private readonly IProtocolService _protocolService ;
2025-04-30 12:36:54 +08:00
private readonly DataMigrationOptions _dataMigrationOptions ;
private readonly KafkaOptionConfig _kafkaOptions ;
2025-04-30 15:57:14 +08:00
private readonly ServerApplicationOptions _applicationOptions ;
2025-04-24 17:48:20 +08:00
2025-04-29 23:48:47 +08:00
int pageSize = 10000 ;
2025-04-14 16:41:41 +08:00
2025-03-17 14:23:48 +08:00
public BasicScheduledMeterReadingService (
ILogger < BasicScheduledMeterReadingService > logger ,
2025-04-30 12:36:54 +08:00
IDataChannelManageService dataChannelManage ,
2025-04-16 17:36:46 +08:00
IRedisDataCacheService redisDataCacheService ,
2025-04-21 10:17:40 +08:00
IIoTDbProvider dbProvider ,
2025-04-24 17:48:20 +08:00
IProtocolService protocolService ,
2025-04-30 12:36:54 +08:00
IOptions < DataMigrationOptions > dataMigrationOptions ,
2025-04-25 09:28:20 +08:00
IOptions < KafkaOptionConfig > kafkaOptions ,
IOptions < ServerApplicationOptions > applicationOptions )
2025-03-14 14:24:38 +08:00
{
_logger = logger ;
2025-04-08 17:44:42 +08:00
_dbProvider = dbProvider ;
2025-04-30 12:36:54 +08:00
_dataChannelManage = dataChannelManage ;
2025-04-16 17:36:46 +08:00
_redisDataCacheService = redisDataCacheService ;
2025-04-24 17:48:20 +08:00
_protocolService = protocolService ;
2025-04-22 17:58:14 +08:00
2025-04-30 12:36:54 +08:00
_dataMigrationOptions = dataMigrationOptions . Value ;
_kafkaOptions = kafkaOptions . Value ;
2025-04-30 15:57:14 +08:00
_applicationOptions = applicationOptions . Value ;
2025-04-30 12:36:54 +08:00
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 系统类型
/// </summary>
public abstract string SystemType { get ; }
2025-04-09 17:29:30 +08:00
/// <summary>
/// 应用服务器部署标记
/// </summary>
public abstract string ServerTagName { get ; }
2025-03-14 14:24:38 +08:00
/// <summary>
///电表日冻结采集项
/// </summary>
protected List < string > DayFreezeCodes = new List < string > ( ) { "0D_3" , "0D_4" , "0D_161" , "0D_162" , "0D_163" , "0D_164" , "0D_165" , "0D_166" , "0D_167" , "0D_168" , "0C_149" , } ;
/// <summary>
/// 电表月冻结采集项
/// </summary>
protected List < string > MonthFreezeCodes = new List < string > ( ) { "0D_177" , "0D_178" , "0D_179" , "0D_180" , "0D_181" , "0D_182" , "0D_183" , "0D_184" , "0D_193" , "0D_195" , } ;
/// <summary>
/// 获取采集项列表
/// </summary>
/// <returns></returns>
public virtual Task < List < GatherItemInfo > > GetGatherItemByDataTypes ( )
{
throw new NotImplementedException ( $"{nameof(GetGatherItemByDataTypes)}请根据不同系统类型进行实现" ) ;
}
2025-03-18 15:58:37 +08:00
/// <summary>
/// 构建待处理的下发指令任务处理
/// </summary>
/// <returns></returns>
public virtual async Task CreateToBeIssueTasks ( )
2025-04-14 21:56:24 +08:00
{
2025-04-09 23:11:36 +08:00
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*" ;
2025-03-18 15:58:37 +08:00
var taskInfos = await FreeRedisProvider . Instance . KeysAsync ( redisCacheKey ) ;
if ( taskInfos = = null | | taskInfos . Length < = 0 )
{
_logger . LogWarning ( $"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时没有缓存数据,-101" ) ;
return ;
}
2025-04-18 17:46:24 +08:00
var currentTime = DateTime . Now ;
2025-04-30 15:57:14 +08:00
2025-04-25 09:28:20 +08:00
//定时抄读
2025-03-18 15:58:37 +08:00
foreach ( var item in taskInfos )
{
var tasksToBeIssueModel = await FreeRedisProvider . Instance . GetAsync < TasksToBeIssueModel > ( item ) ;
if ( tasksToBeIssueModel = = null )
{
_logger . LogWarning ( $"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102" ) ;
continue ;
2025-04-14 21:56:24 +08:00
}
2025-04-14 16:41:41 +08:00
2025-04-23 09:42:09 +08:00
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus, tempArryay[1]=>SystemTypeConst, tempArryay[2]=>ServerTagName, tempArryay[3]=>TaskInfo, tempArryay[4]=>表计类别, tempArryay[5]=>采集频率
2025-04-14 16:41:41 +08:00
var tempArryay = item . Split ( ":" ) ;
2025-04-14 23:42:18 +08:00
string meteryType = tempArryay [ 4 ] ; //表计类别
int timeDensity = Convert . ToInt32 ( tempArryay [ 5 ] ) ; //采集频率
2025-04-15 09:43:51 +08:00
if ( timeDensity > 15 )
2025-04-14 23:42:18 +08:00
{
timeDensity = 15 ;
}
2025-03-18 15:58:37 +08:00
2025-04-25 12:01:15 +08:00
//电表定时广播校时,一天一次。
string currentTimeStr = $"{currentTime:HH:mm:00}" ;
2025-04-27 09:40:31 +08:00
if ( string . Equals ( currentTimeStr , _applicationOptions . AutomaticVerificationTime , StringComparison . CurrentCultureIgnoreCase ) ) //自动校时
2025-04-25 12:01:15 +08:00
{
2025-04-27 09:40:31 +08:00
//_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
//return;
_ = CreateMeterPublishTask < AmmeterInfo > (
timeDensity : timeDensity ,
nextTaskTime : currentTime ,
meterType : MeterTypeEnum . Ammeter ,
taskCreateAction : async ( timeDensity , data , groupIndex , timestamps ) = >
{
2025-04-30 12:36:54 +08:00
var tempTask = await AmmeterScheduledAutomaticVerificationTime ( timeDensity , data , groupIndex , timestamps ) ;
if ( tempTask = = null | | tempTask . Count < = 0 )
{
_logger . LogWarning ( $"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}" ) ;
return ;
}
2025-05-06 14:33:49 +08:00
_ = _dataChannelManage . ScheduledMeterTaskWriterAsync ( DataChannelManage . TaskDataChannel . Writer , Tuple . Create ( ProtocolConst . AmmeterSubscriberWorkerOtherIssuedEventName , tempTask ) ) ;
2025-04-27 09:40:31 +08:00
} ) ;
2025-04-25 12:01:15 +08:00
}
2025-04-27 17:27:04 +08:00
else if ( string . Equals ( currentTimeStr , _applicationOptions . AutomaticTerminalVersionTime , StringComparison . CurrentCultureIgnoreCase ) ) //集中器版本号读取
{
2025-04-27 09:40:31 +08:00
_ = CreateMeterPublishTask < AmmeterInfo > (
timeDensity : timeDensity ,
nextTaskTime : currentTime ,
meterType : MeterTypeEnum . Ammeter ,
taskCreateAction : async ( timeDensity , data , groupIndex , timestamps ) = >
{
2025-04-30 15:57:14 +08:00
var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion ( timeDensity , data , groupIndex , timestamps ) ;
2025-04-30 12:36:54 +08:00
if ( tempTask = = null | | tempTask . Count < = 0 )
{
_logger . LogWarning ( $"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}" ) ;
return ;
}
2025-05-06 14:33:49 +08:00
_ = _dataChannelManage . ScheduledMeterTaskWriterAsync ( DataChannelManage . TaskDataChannel . Writer , Tuple . Create ( ProtocolConst . AmmeterSubscriberWorkerOtherIssuedEventName , tempTask ) ) ;
2025-04-27 09:40:31 +08:00
} ) ;
}
2025-04-27 17:27:04 +08:00
else if ( string . Equals ( currentTimeStr , _applicationOptions . AutomaticTelematicsModuleTime , StringComparison . CurrentCultureIgnoreCase ) ) //SIM卡读取
2025-04-25 12:01:15 +08:00
{
_ = CreateMeterPublishTask < AmmeterInfo > (
2025-04-27 09:40:31 +08:00
timeDensity : timeDensity ,
nextTaskTime : currentTime ,
meterType : MeterTypeEnum . Ammeter ,
taskCreateAction : async ( timeDensity , data , groupIndex , timestamps ) = >
{
2025-04-30 12:36:54 +08:00
var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule ( timeDensity , data , groupIndex , timestamps ) ;
if ( tempTask = = null | | tempTask . Count < = 0 )
{
_logger . LogWarning ( $"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}" ) ;
return ;
}
2025-05-06 14:33:49 +08:00
_ = _dataChannelManage . ScheduledMeterTaskWriterAsync ( DataChannelManage . TaskDataChannel . Writer , Tuple . Create ( ProtocolConst . AmmeterSubscriberWorkerOtherIssuedEventName , tempTask ) ) ;
2025-04-27 09:40:31 +08:00
} ) ;
}
2025-04-27 17:27:04 +08:00
else if ( string . Equals ( currentTimeStr , _applicationOptions . AutomaticTelematicsModuleTime , StringComparison . CurrentCultureIgnoreCase ) ) //月冻结
{
_ = CreateMeterPublishTask < AmmeterInfo > (
timeDensity : timeDensity ,
nextTaskTime : currentTime ,
meterType : MeterTypeEnum . Ammeter ,
taskCreateAction : async ( timeDensity , data , groupIndex , timestamps ) = >
{
2025-04-30 12:36:54 +08:00
var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData ( timeDensity , data , groupIndex , timestamps ) ;
if ( tempTask = = null | | tempTask . Count < = 0 )
{
_logger . LogWarning ( $"电表 {data.Name} 任务数据构建失败:{data.Serialize()}" ) ;
return ;
}
2025-05-06 14:33:49 +08:00
_ = _dataChannelManage . ScheduledMeterTaskWriterAsync ( DataChannelManage . TaskDataChannel . Writer , Tuple . Create ( ProtocolConst . AmmeterSubscriberWorkerOtherIssuedEventName , tempTask ) ) ;
2025-04-27 17:27:04 +08:00
} ) ;
}
else if ( string . Equals ( currentTimeStr , _applicationOptions . AutomaticDayFreezeTime , StringComparison . CurrentCultureIgnoreCase ) ) //日冻结
{
_ = CreateMeterPublishTask < AmmeterInfo > (
timeDensity : timeDensity ,
nextTaskTime : currentTime ,
meterType : MeterTypeEnum . Ammeter ,
taskCreateAction : async ( timeDensity , data , groupIndex , timestamps ) = >
{
2025-04-30 12:36:54 +08:00
var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData ( timeDensity , data , groupIndex , timestamps ) ;
if ( tempTask = = null | | tempTask . Count < = 0 )
{
_logger . LogWarning ( $"电表 {data.Name} 任务数据构建失败:{data.Serialize()}" ) ;
return ;
}
2025-05-06 14:33:49 +08:00
_ = _dataChannelManage . ScheduledMeterTaskWriterAsync ( DataChannelManage . TaskDataChannel . Writer , Tuple . Create ( ProtocolConst . AmmeterSubscriberWorkerOtherIssuedEventName , tempTask ) ) ;
2025-04-27 17:27:04 +08:00
} ) ;
}
2025-04-27 09:40:31 +08:00
else
{
_logger . LogInformation ( $"{nameof(CreateToBeIssueTasks)} 不是自动校时、采集终端信息等时间,继续处理其他" ) ;
2025-04-25 12:01:15 +08:00
}
2025-04-14 10:20:48 +08:00
//检查任务时间节点, 由于定时任务10秒钟运行一次, 需要判定当前时间是否在任务时间节点内, 不在则跳过
2025-04-29 23:48:47 +08:00
var currentTaskTime = tasksToBeIssueModel . LastTaskTime . CalculateNextCollectionTime ( timeDensity ) ; //程序启动缓存电表的时候, NextTaskTime需要格式化到下一个采集点时间。
if ( ! IsTaskTime ( currentTaskTime , timeDensity ) )
2025-04-14 10:20:48 +08:00
{
2025-04-14 16:41:41 +08:00
_logger . LogInformation ( $"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103" ) ;
2025-04-14 10:20:48 +08:00
continue ;
}
2025-04-17 15:49:57 +08:00
2025-04-14 16:41:41 +08:00
var meterTypes = EnumExtensions . ToEnumDictionary < MeterTypeEnum > ( ) ;
2025-04-29 23:48:47 +08:00
//tasksToBeIssueModel.NextTaskTime;
2025-04-14 16:41:41 +08:00
2025-03-18 15:58:37 +08:00
if ( meteryType = = MeterTypeEnum . Ammeter . ToString ( ) )
{
2025-04-22 16:44:47 +08:00
_ = CreateMeterPublishTask < AmmeterInfo > (
2025-04-18 17:46:24 +08:00
timeDensity : timeDensity ,
2025-04-22 17:58:14 +08:00
nextTaskTime : currentTaskTime ,
2025-04-18 17:46:24 +08:00
meterType : MeterTypeEnum . Ammeter ,
2025-04-24 17:48:20 +08:00
taskCreateAction : async ( timeDensity , data , groupIndex , timestamps ) = >
2025-04-15 09:43:51 +08:00
{
2025-04-24 17:48:20 +08:00
var tempTask = await AmmerterCreatePublishTaskAction ( timeDensity , data , groupIndex , timestamps ) ;
2025-04-21 22:57:49 +08:00
if ( tempTask = = null | | tempTask . Count < = 0 )
{
2025-04-30 15:57:14 +08:00
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
2025-04-21 22:57:49 +08:00
return ;
}
2025-05-06 14:33:49 +08:00
_ = _dataChannelManage . ScheduledMeterTaskWriterAsync ( DataChannelManage . TaskDataChannel . Writer , Tuple . Create ( ProtocolConst . AmmeterSubscriberWorkerAutoValveControlIssuedEventName , tempTask ) ) ;
2025-04-18 17:46:24 +08:00
} ) ;
2025-03-18 15:58:37 +08:00
}
else if ( meteryType = = MeterTypeEnum . WaterMeter . ToString ( ) )
{
2025-04-18 17:46:24 +08:00
_ = CreateMeterPublishTask < WatermeterInfo > (
timeDensity : timeDensity ,
2025-04-22 17:58:14 +08:00
nextTaskTime : currentTaskTime ,
2025-04-23 11:13:59 +08:00
meterType : MeterTypeEnum . WaterMeter ,
2025-04-24 17:48:20 +08:00
taskCreateAction : async ( timeDensity , data , groupIndex , timestamps ) = >
2025-04-18 17:46:24 +08:00
{
2025-04-24 17:48:20 +08:00
var tempTask = await WatermeterCreatePublishTaskAction ( timeDensity , data , groupIndex , timestamps ) ;
2025-04-23 09:42:09 +08:00
if ( tempTask = = null | | tempTask . Count < = 0 )
{
_logger . LogWarning ( $"水表 {data.Name} 任务数据构建失败:{data.Serialize()}" ) ;
return ;
}
2025-05-06 14:33:49 +08:00
_ = _dataChannelManage . ScheduledMeterTaskWriterAsync ( DataChannelManage . TaskDataChannel . Writer , Tuple . Create ( ProtocolConst . WatermeterSubscriberWorkerAutoReadingIssuedEventName , tempTask ) ) ;
2025-04-18 17:46:24 +08:00
} ) ;
2025-03-18 15:58:37 +08:00
}
else
{
2025-04-14 10:20:48 +08:00
_logger . LogError ( $"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-106" ) ;
2025-03-18 15:58:37 +08:00
}
_logger . LogInformation ( $"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成" ) ;
2025-04-14 16:41:41 +08:00
2025-04-13 21:26:27 +08:00
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
2025-04-22 17:58:14 +08:00
tasksToBeIssueModel . LastTaskTime = currentTaskTime ;
tasksToBeIssueModel . NextTaskTime = currentTaskTime . CalculateNextCollectionTime ( timeDensity ) ;
2025-04-27 17:27:04 +08:00
await FreeRedisProvider . Instance . SetAsync ( item , tasksToBeIssueModel ) ;
2025-03-18 15:58:37 +08:00
}
2025-04-25 09:28:20 +08:00
//电表定时阀控任务处理。
2025-04-30 15:57:14 +08:00
var autoValveControlTask = await AmmeterScheduledAutoValveControl ( ) ;
2025-04-27 17:27:04 +08:00
2025-04-30 12:36:54 +08:00
if ( autoValveControlTask = = null | | autoValveControlTask . Count < = 0 )
{
_logger . LogWarning ( $"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务" ) ;
return ;
}
2025-05-06 14:33:49 +08:00
_ = _dataChannelManage . ScheduledMeterTaskWriterAsync ( DataChannelManage . TaskDataChannel . Writer , Tuple . Create ( ProtocolConst . AmmeterSubscriberWorkerAutoValveControlIssuedEventName , autoValveControlTask ) ) ;
2025-03-18 15:58:37 +08:00
}
2025-03-14 14:24:38 +08:00
#region 电 表 采 集 处 理
/// <summary>
/// 获取电表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task < List < AmmeterInfo > > GetAmmeterInfoList ( string gatherCode = "" )
{
throw new NotImplementedException ( $"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现" ) ;
}
/// <summary>
/// 初始化电表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitAmmeterCacheData ( string gatherCode = "" )
{
2025-05-06 14:33:49 +08:00
// 创建取消令牌源
var cts = new CancellationTokenSource ( ) ;
_ = _dataChannelManage . ScheduledMeterTaskReadingAsync ( DataChannelManage . TaskDataChannel . Reader , cts . Token ) ;
2025-04-30 12:36:54 +08:00
2025-04-23 09:42:09 +08:00
//此处代码不要删除
2025-04-28 16:37:31 +08:00
#if DEBUG
var timeDensity = "15" ;
2025-04-30 15:57:14 +08:00
var serverTagName = "JiSheCollectBus2" ;
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}" ;
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}" ;
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}" ;
2025-04-28 16:37:31 +08:00
List < AmmeterInfo > meterInfos = new List < AmmeterInfo > ( ) ;
List < string > focusAddressDataLista = new List < string > ( ) ;
var timer1 = Stopwatch . StartNew ( ) ;
var allIds = new HashSet < string > ( ) ;
decimal? score = null ;
string member = null ;
while ( true )
{
var page = await _redisDataCacheService . GetAllPagedData < AmmeterInfo > (
redisCacheMeterInfoHashKeyTemp ,
redisCacheMeterInfoZSetScoresIndexKeyTemp ,
pageSize : 1000 ,
lastScore : score ,
lastMember : member ) ;
meterInfos . AddRange ( page . Items ) ;
2025-04-29 23:48:47 +08:00
focusAddressDataLista . AddRange ( page . Items . Select ( d = > $"{d.MeterId}" ) ) ;
2025-04-28 16:37:31 +08:00
foreach ( var item in page . Items )
{
if ( ! allIds . Add ( item . MemberId ) )
{
_logger . LogError ( $"{item.MemberId}Duplicate data found!" ) ;
}
}
if ( ! page . HasNext ) break ;
score = page . NextScore ;
member = page . NextMember ;
}
timer1 . Stop ( ) ;
2025-04-30 12:36:54 +08:00
_logger . LogError ( $"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒" ) ;
2025-04-28 16:37:31 +08:00
DeviceGroupBalanceControl . InitializeCache ( focusAddressDataLista , _kafkaOptions . NumPartitions ) ;
return ;
#else
var meterInfos = await GetAmmeterInfoList ( gatherCode ) ;
#endif
2025-03-14 14:24:38 +08:00
if ( meterInfos = = null | | meterInfos . Count < = 0 )
{
throw new NullReferenceException ( $"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空" ) ;
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes ( ) ;
if ( gatherItemInfos = = null | | gatherItemInfos . Count < = 0 )
{
throw new NullReferenceException ( $"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空" ) ;
}
2025-04-15 17:40:17 +08:00
var timer = Stopwatch . StartNew ( ) ;
2025-03-14 14:24:38 +08:00
2025-04-23 11:13:59 +08:00
List < string > deviceIds = new List < string > ( ) ; //用于处理Kafka主题分区数据的分发和处理。
2025-04-09 23:11:36 +08:00
2025-03-14 14:24:38 +08:00
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos . GroupBy ( d = > d . TimeDensity ) ;
2025-04-29 23:48:47 +08:00
var currentTaskTime = DateTime . Now ;
2025-04-25 09:28:20 +08:00
if ( _applicationOptions . FirstCollectionTime . HasValue = = false )
2025-04-21 22:57:49 +08:00
{
2025-04-29 23:48:47 +08:00
_applicationOptions . FirstCollectionTime = currentTaskTime ;
2025-04-21 22:57:49 +08:00
}
2025-04-18 17:46:24 +08:00
//先处理采集频率任务缓存
foreach ( var item in meterInfoGroupByTimeDensity )
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel ( )
{
2025-04-29 23:48:47 +08:00
LastTaskTime = _applicationOptions . FirstCollectionTime . Value . CalculateNextCollectionTime ( item . Key ) ,
2025-04-18 17:46:24 +08:00
TimeDensity = item . Key ,
} ;
2025-04-29 23:48:47 +08:00
nextTask . NextTaskTime = nextTask . LastTaskTime . CalculateNextCollectionTime ( item . Key ) ;
2025-04-18 17:46:24 +08:00
//todo 首次采集时间节点到目前运行时间中漏采的时间点, 可以考虑使用IoTDB的存储, 利用时间序列处理。
var taskRedisCacheKey = string . Format ( RedisConst . CacheTasksToBeIssuedKey , SystemType , ServerTagName , MeterTypeEnum . Ammeter , item . Key ) ;
await FreeRedisProvider . Instance . SetAsync ( taskRedisCacheKey , nextTask ) ;
}
2025-03-14 14:24:38 +08:00
foreach ( var itemTimeDensity in meterInfoGroupByTimeDensity )
{
2025-04-16 17:36:46 +08:00
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}" ;
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}" ;
2025-04-17 15:49:57 +08:00
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}" ;
2025-04-15 17:40:17 +08:00
List < AmmeterInfo > ammeterInfos = new List < AmmeterInfo > ( ) ;
2025-03-14 14:24:38 +08:00
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity . GroupBy ( x = > x . FocusAddress ) . ToList ( ) ;
foreach ( var item in meterInfoGroup )
{
2025-03-18 15:58:37 +08:00
if ( string . IsNullOrWhiteSpace ( item . Key ) ) //集中器号为空,跳过
2025-03-14 14:24:38 +08:00
{
continue ;
}
2025-04-23 16:17:29 +08:00
2025-03-14 14:24:38 +08:00
foreach ( var ammeter in item )
{
2025-04-23 11:13:59 +08:00
deviceIds . Add ( ammeter . MeterId . ToString ( ) ) ;
2025-03-14 14:24:38 +08:00
//处理ItemCode
2025-03-27 08:38:19 +08:00
if ( string . IsNullOrWhiteSpace ( ammeter . ItemCodes ) & & ! string . IsNullOrWhiteSpace ( ammeter . DataTypes ) )
2025-03-14 14:24:38 +08:00
{
var itemArr = ammeter . DataTypes . Split ( ',' ) . ToList ( ) ;
#region 拼 接 采 集 项
List < string > itemCodeList = new List < string > ( ) ;
foreach ( var dataType in itemArr )
{
2025-03-17 11:34:30 +08:00
var excludeItemCode = "10_98,10_94" ; //TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表
2025-03-14 14:24:38 +08:00
var gatherItem = gatherItemInfos . FirstOrDefault ( f = > f . DataType . Equals ( dataType ) ) ;
if ( gatherItem ! = null )
{
if ( ! excludeItemCode . Contains ( gatherItem . ItemCode ) )
{
itemCodeList . Add ( gatherItem . ItemCode ) ;
}
}
2025-03-17 11:34:30 +08:00
#region 特 殊 电 表 采 集 项 编 号 处 理
if ( itemArr . Exists ( e = > e . Equals ( "95" ) ) ) //德力西DTS
{
itemCodeList . Add ( "10_95" ) ;
}
if ( itemArr . Exists ( e = > e . Equals ( "109" ) ) ) //WAVE_109
{
itemCodeList . Add ( "10_109" ) ;
}
#endregion
2025-03-14 14:24:38 +08:00
}
#endregion
2025-03-17 11:34:30 +08:00
2025-03-14 14:24:38 +08:00
ammeter . ItemCodes = itemCodeList . Serialize ( ) ; //转换成JSON字符串
if ( ! string . IsNullOrWhiteSpace ( ammeter . ItemCodes ) )
{
ammeter . ItemCodes = ammeter . ItemCodes . Replace ( "WAVE_109" , "10_109" ) ;
}
}
2025-04-15 17:40:17 +08:00
ammeterInfos . Add ( ammeter ) ;
2025-03-14 14:24:38 +08:00
}
}
2025-03-18 15:58:37 +08:00
2025-04-16 17:36:46 +08:00
await _redisDataCacheService . BatchInsertDataAsync < AmmeterInfo > (
redisCacheMeterInfoHashKey ,
redisCacheMeterInfoSetIndexKey ,
2025-04-17 15:49:57 +08:00
redisCacheMeterInfoZSetScoresIndexKey , ammeterInfos ) ;
2025-03-14 14:24:38 +08:00
}
2025-04-10 14:12:14 +08:00
//初始化设备组负载控制
2025-04-23 11:13:59 +08:00
if ( deviceIds = = null | | deviceIds . Count < = 0 )
2025-04-10 14:12:14 +08:00
{
_logger . LogError ( $"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息" ) ;
}
else
{
2025-04-23 11:13:59 +08:00
DeviceGroupBalanceControl . InitializeCache ( deviceIds , _kafkaOptions . NumPartitions ) ;
2025-04-10 14:12:14 +08:00
}
2025-04-15 17:40:17 +08:00
timer . Stop ( ) ;
_logger . LogInformation ( $"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒" ) ;
2025-03-14 14:24:38 +08:00
}
/// <summary>
2025-03-18 15:58:37 +08:00
/// 1分钟采集电表数据, 只获取任务数据下发, 不构建任务
2025-03-14 14:24:38 +08:00
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterOneMinuteReading ( )
{
2025-04-18 17:46:24 +08:00
int timeDensity = 5 ;
2025-04-23 09:42:09 +08:00
var redisCacheKey = string . Format ( RedisConst . CacheTasksToBeIssuedKey , SystemType , ServerTagName , MeterTypeEnum . Ammeter , timeDensity ) ;
var taskInfo = await FreeRedisProvider . Instance . GetAsync < TasksToBeIssueModel > ( redisCacheKey ) ;
2025-03-14 14:24:38 +08:00
2025-04-29 23:48:47 +08:00
if ( taskInfo = = null )
2025-03-14 14:24:38 +08:00
{
2025-04-23 09:42:09 +08:00
_logger . LogError ( $"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败, 请检查Redis中是否有对应的任务下发信息" ) ;
return ;
}
2025-04-01 22:50:34 +08:00
2025-04-29 23:48:47 +08:00
var pendingCopyReadTime = taskInfo . LastTaskTime . GetDateTimeOffset ( ) . ToUnixTimeNanoseconds ( ) ;
2025-03-18 22:43:24 +08:00
2025-04-23 09:42:09 +08:00
var conditions = new List < QueryCondition > ( ) ;
conditions . Add ( new QueryCondition ( )
{
Field = "PendingCopyReadTime" ,
Operator = "=" ,
IsNumber = true ,
Value = pendingCopyReadTime
} ) ;
2025-03-18 15:58:37 +08:00
2025-04-23 16:17:29 +08:00
_ = CreateMeterKafkaTaskMessage < MeterReadingTelemetryPacketInfo > ( ProtocolConst . AmmeterSubscriberWorkerOneMinuteIssuedEventName , new IoTDBQueryOptions ( )
2025-04-23 09:42:09 +08:00
{
TableNameOrTreePath = DevicePathBuilder . GetTableName < MeterReadingTelemetryPacketInfo > ( ) ,
PageIndex = 1 ,
2025-04-23 16:17:29 +08:00
PageSize = pageSize ,
2025-04-23 09:42:09 +08:00
Conditions = conditions ,
} ) ;
2025-03-18 15:58:37 +08:00
2025-03-14 14:24:38 +08:00
}
/// <summary>
2025-03-18 15:58:37 +08:00
/// 5分钟采集电表数据
2025-03-14 14:24:38 +08:00
/// </summary>
/// <returns></returns>
2025-03-18 15:58:37 +08:00
public virtual async Task AmmeterScheduledMeterFiveMinuteReading ( )
2025-03-14 14:24:38 +08:00
{
2025-03-18 15:58:37 +08:00
int timeDensity = 5 ;
2025-04-23 09:42:09 +08:00
var redisCacheKey = string . Format ( RedisConst . CacheTasksToBeIssuedKey , SystemType , ServerTagName , MeterTypeEnum . Ammeter , timeDensity ) ;
var taskInfo = await FreeRedisProvider . Instance . GetAsync < TasksToBeIssueModel > ( redisCacheKey ) ;
2025-03-17 11:34:30 +08:00
2025-04-29 23:48:47 +08:00
if ( taskInfo = = null )
2025-03-14 14:24:38 +08:00
{
2025-04-23 09:42:09 +08:00
_logger . LogError ( $"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败, 请检查Redis中是否有对应的任务下发信息" ) ;
return ;
}
2025-03-18 22:43:24 +08:00
2025-04-29 23:48:47 +08:00
var pendingCopyReadTime = taskInfo . LastTaskTime . GetDateTimeOffset ( ) . ToUnixTimeNanoseconds ( ) ;
2025-03-14 14:24:38 +08:00
2025-04-23 09:42:09 +08:00
var conditions = new List < QueryCondition > ( ) ;
conditions . Add ( new QueryCondition ( )
{
Field = "PendingCopyReadTime" ,
Operator = "=" ,
IsNumber = true ,
Value = pendingCopyReadTime
} ) ;
2025-03-14 14:24:38 +08:00
2025-04-23 16:17:29 +08:00
_ = CreateMeterKafkaTaskMessage < MeterReadingTelemetryPacketInfo > ( ProtocolConst . AmmeterSubscriberWorkerFiveMinuteIssuedEventName , new IoTDBQueryOptions ( )
2025-04-23 09:42:09 +08:00
{
TableNameOrTreePath = DevicePathBuilder . GetTableName < MeterReadingTelemetryPacketInfo > ( ) ,
PageIndex = 1 ,
2025-04-23 16:17:29 +08:00
PageSize = pageSize ,
2025-04-23 09:42:09 +08:00
Conditions = conditions ,
} ) ;
2025-03-14 14:24:38 +08:00
}
/// <summary>
2025-03-18 15:58:37 +08:00
/// 15分钟采集电表数据
2025-03-14 14:24:38 +08:00
/// </summary>
/// <returns></returns>
2025-03-18 15:58:37 +08:00
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading ( )
2025-03-14 14:24:38 +08:00
{
2025-03-18 15:58:37 +08:00
int timeDensity = 15 ;
2025-04-23 09:42:09 +08:00
var redisCacheKey = string . Format ( RedisConst . CacheTasksToBeIssuedKey , SystemType , ServerTagName , MeterTypeEnum . Ammeter , timeDensity ) ;
2025-04-22 16:44:47 +08:00
var taskInfo = await FreeRedisProvider . Instance . GetAsync < TasksToBeIssueModel > ( redisCacheKey ) ;
2025-03-20 16:40:27 +08:00
2025-04-29 23:48:47 +08:00
if ( taskInfo = = null )
2025-04-22 16:44:47 +08:00
{
_logger . LogError ( $"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败, 请检查Redis中是否有对应的任务下发信息" ) ;
return ;
}
2025-03-14 14:24:38 +08:00
2025-04-29 23:48:47 +08:00
var pendingCopyReadTime = taskInfo . LastTaskTime . GetDateTimeOffset ( ) . ToUnixTimeNanoseconds ( ) ;
2025-04-18 17:46:24 +08:00
2025-04-22 16:44:47 +08:00
var conditions = new List < QueryCondition > ( ) ;
conditions . Add ( new QueryCondition ( )
2025-03-14 14:24:38 +08:00
{
2025-04-22 16:44:47 +08:00
Field = "PendingCopyReadTime" ,
Operator = "=" ,
IsNumber = true ,
Value = pendingCopyReadTime
2025-04-17 17:23:20 +08:00
} ) ;
2025-03-20 16:40:27 +08:00
2025-04-23 16:17:29 +08:00
_ = CreateMeterKafkaTaskMessage < MeterReadingTelemetryPacketInfo > ( ProtocolConst . AmmeterSubscriberWorkerFifteenMinuteIssuedEventName , new IoTDBQueryOptions ( )
2025-04-22 16:44:47 +08:00
{
TableNameOrTreePath = DevicePathBuilder . GetTableName < MeterReadingTelemetryPacketInfo > ( ) ,
PageIndex = 1 ,
2025-04-23 16:17:29 +08:00
PageSize = pageSize ,
2025-04-22 16:44:47 +08:00
Conditions = conditions ,
2025-04-23 09:42:09 +08:00
} ) ;
2025-03-14 14:24:38 +08:00
}
/// <summary>
2025-04-18 17:46:24 +08:00
/// 创建电表待发送的任务数据
2025-03-14 14:24:38 +08:00
/// </summary>
2025-03-18 15:58:37 +08:00
/// <param name="timeDensity">采集频率</param>
2025-04-18 17:46:24 +08:00
/// <param name="ammeterInfo">电表信息</param>
2025-04-17 11:29:26 +08:00
/// <param name="groupIndex">集中器所在分组</param>
2025-04-21 22:57:49 +08:00
/// <param name="timestamps">采集频率对应的时间戳</param>
2025-03-14 14:24:38 +08:00
/// <returns></returns>
2025-04-24 17:48:20 +08:00
private async Task < List < MeterReadingTelemetryPacketInfo > > AmmerterCreatePublishTaskAction ( int timeDensity , AmmeterInfo ammeterInfo , int groupIndex , DateTime timestamps )
2025-04-14 17:38:34 +08:00
{
var currentTime = DateTime . Now ;
2025-04-18 11:31:23 +08:00
2025-04-24 17:48:20 +08:00
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService . GetProtocolServiceAsync ( ammeterInfo . BrandType ) ;
if ( protocolPlugin = = null )
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 创建电表待发送的任务数据{currentTime}没有找到对应的协议组件,-105");
2025-04-29 23:48:47 +08:00
return null ;
2025-04-24 17:48:20 +08:00
}
2025-04-14 21:56:24 +08:00
if ( string . IsNullOrWhiteSpace ( ammeterInfo . ItemCodes ) )
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
2025-04-21 22:57:49 +08:00
return null ;
2025-04-14 21:56:24 +08:00
}
//载波的不处理
if ( ammeterInfo . MeteringPort = = ( int ) MeterLinkProtocolEnum . Carrierwave )
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
2025-04-21 22:57:49 +08:00
return null ;
2025-04-14 21:56:24 +08:00
}
2025-04-14 17:38:34 +08:00
2025-04-14 21:56:24 +08:00
if ( ammeterInfo . State . Equals ( 2 ) )
2025-04-14 17:38:34 +08:00
{
2025-04-30 12:36:54 +08:00
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
2025-04-21 22:57:49 +08:00
return null ;
2025-04-14 21:56:24 +08:00
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}, 已超过1天未在线, 不生成指令");
// continue;
//}
2025-04-14 17:38:34 +08:00
2025-04-14 21:56:24 +08:00
if ( string . IsNullOrWhiteSpace ( ammeterInfo . AreaCode ) )
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空");
2025-04-21 22:57:49 +08:00
return null ;
2025-04-14 21:56:24 +08:00
}
if ( string . IsNullOrWhiteSpace ( ammeterInfo . Address ) )
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空");
2025-04-21 22:57:49 +08:00
return null ;
2025-04-14 21:56:24 +08:00
}
if ( Convert . ToInt32 ( ammeterInfo . Address ) > 65535 )
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535");
2025-04-21 22:57:49 +08:00
return null ;
2025-04-14 21:56:24 +08:00
}
if ( ammeterInfo . MeteringCode < = 0 | | ammeterInfo . MeteringCode > 33 )
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})");
2025-04-21 22:57:49 +08:00
return null ;
2025-04-14 21:56:24 +08:00
}
List < string > tempCodes = ammeterInfo . ItemCodes . Deserialize < List < string > > ( ) ! ;
//TODO:自动上报数据只主动采集1类数据。
if ( ammeterInfo . AutomaticReport . Equals ( 1 ) )
{
var tempSubCodes = new List < string > ( ) ;
if ( tempCodes . Contains ( "0C_49" ) )
2025-04-14 17:38:34 +08:00
{
2025-04-14 21:56:24 +08:00
tempSubCodes . Add ( "0C_49" ) ;
2025-04-14 17:38:34 +08:00
}
2025-04-14 21:56:24 +08:00
if ( tempSubCodes . Contains ( "0C_149" ) )
2025-04-14 17:38:34 +08:00
{
2025-04-14 21:56:24 +08:00
tempSubCodes . Add ( "0C_149" ) ;
2025-04-14 17:38:34 +08:00
}
2025-04-14 21:56:24 +08:00
if ( ammeterInfo . ItemCodes . Contains ( "10_97" ) )
2025-04-14 17:38:34 +08:00
{
2025-04-14 21:56:24 +08:00
tempSubCodes . Add ( "10_97" ) ;
2025-04-14 17:38:34 +08:00
}
2025-04-14 21:56:24 +08:00
if ( tempSubCodes = = null | | tempSubCodes . Count < = 0 )
2025-04-14 17:38:34 +08:00
{
2025-04-15 15:49:51 +08:00
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
2025-04-21 22:57:49 +08:00
return null ;
2025-04-14 17:38:34 +08:00
}
2025-04-14 21:56:24 +08:00
else
2025-04-14 17:38:34 +08:00
{
2025-04-14 21:56:24 +08:00
tempCodes = tempSubCodes ;
2025-04-14 17:38:34 +08:00
}
2025-04-14 21:56:24 +08:00
}
2025-04-17 11:29:26 +08:00
List < MeterReadingTelemetryPacketInfo > taskList = new List < MeterReadingTelemetryPacketInfo > ( ) ;
2025-04-14 21:56:24 +08:00
foreach ( var tempItem in tempCodes )
{
//排除已发送日冻结和月冻结采集项配置
if ( DayFreezeCodes . Contains ( tempItem ) )
2025-04-14 17:38:34 +08:00
{
continue ;
}
2025-04-14 21:56:24 +08:00
if ( MonthFreezeCodes . Contains ( tempItem ) )
2025-04-14 17:38:34 +08:00
{
continue ;
}
2025-04-24 17:48:20 +08:00
//var itemCodeArr = tempItem.Split('_');
//var aFNStr = itemCodeArr[0];
//var aFN = (AFN)aFNStr.HexToDec();
//var fn = int.Parse(itemCodeArr[1]);
2025-04-14 17:38:34 +08:00
2025-04-24 17:48:20 +08:00
//TODO:特殊表
2025-04-24 23:39:39 +08:00
ProtocolBuildResponse builderResponse = await protocolPlugin . BuildAsync ( new ProtocolBuildRequest ( )
2025-04-24 17:48:20 +08:00
{
FocusAddress = ammeterInfo . FocusAddress ,
Pn = ammeterInfo . MeteringCode ,
ItemCode = tempItem ,
2025-04-30 17:11:09 +08:00
DataTimeMark = new Protocol . DataTimeMark ( )
{
Density = ammeterInfo . TimeDensity , //todo 转换成协议的值
Point = 1 ,
DataTime = timestamps ,
}
2025-04-24 17:48:20 +08:00
} ) ;
2025-04-18 11:31:23 +08:00
if ( builderResponse = = null | | builderResponse . Data . Length < = 0 )
2025-04-14 21:56:24 +08:00
{
2025-04-15 15:49:51 +08:00
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
2025-04-14 21:56:24 +08:00
continue ;
}
2025-04-14 17:38:34 +08:00
2025-04-27 17:27:04 +08:00
var meterReadingRecords = CreateAmmeterPacketInfo (
ammeterInfo : ammeterInfo ,
timestamps : DateTimeOffset . Now . ToUnixTimeNanoseconds ( ) ,
builderResponse : builderResponse ,
itemCode : tempItem ,
subItemCode : null ,
2025-04-29 23:48:47 +08:00
pendingCopyReadTime : timestamps ,
2025-04-29 10:02:10 +08:00
creationTime : currentTime ,
packetType : ( TelemetryPacketTypeEnum ) timeDensity ) ;
2025-04-17 11:29:26 +08:00
taskList . Add ( meterReadingRecords ) ;
2025-04-14 17:38:34 +08:00
}
2025-04-15 09:43:51 +08:00
2025-04-21 22:57:49 +08:00
return taskList ;
2025-04-15 15:49:51 +08:00
}
2025-04-23 16:17:29 +08:00
/// <summary>
/// 获取电表阀控配置
/// </summary>
/// <param name="currentTime">阀控的时间</param>
/// <returns></returns>
public virtual Task < List < AmmeterAutoValveControlSetting > > GetAmmeterAutoValveControlSetting ( string currentTime )
{
throw new NotImplementedException ( $"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现" ) ;
}
/// <summary>
/// 电表自动阀控
/// </summary>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual Task < List < MeterReadingTelemetryPacketInfo > > AmmeterScheduledAutoValveControl ( )
2025-04-23 16:17:29 +08:00
{
throw new NotImplementedException ( $"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现" ) ;
}
2025-04-25 12:01:15 +08:00
/// <summary>
/// 电表自动校时
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task < List < MeterReadingTelemetryPacketInfo > > AmmeterScheduledAutomaticVerificationTime ( int timeDensity , AmmeterInfo ammeterInfo , int groupIndex , DateTime timestamps )
2025-04-25 12:01:15 +08:00
{
var currentTime = DateTime . Now ;
string currentTimeStr = $"{currentTime:HH:mm:00}" ;
try
{
//判断是否是自动校时时间
2025-04-27 17:27:04 +08:00
if ( ! string . Equals ( currentTimeStr , _applicationOptions . AutomaticVerificationTime , StringComparison . CurrentCultureIgnoreCase ) )
2025-04-25 12:01:15 +08:00
{
_logger . LogInformation ( $"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-25 12:01:15 +08:00
}
2025-04-27 17:27:04 +08:00
2025-04-25 12:01:15 +08:00
List < MeterReadingTelemetryPacketInfo > taskList = new List < MeterReadingTelemetryPacketInfo > ( ) ;
2025-04-27 09:40:31 +08:00
var itemCode = T37612012PacketItemCodeConst . AFN10HFN01H ;
var subItemCode = T6452007PacketItemCodeConst . C08 ;
2025-04-25 12:01:15 +08:00
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService . GetProtocolServiceAsync ( ammeterInfo . BrandType ) ;
if ( protocolPlugin = = null )
{
_logger . LogError ( $"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-25 12:01:15 +08:00
}
ProtocolBuildResponse builderResponse = await protocolPlugin . BuildAsync ( new ProtocolBuildRequest ( )
{
FocusAddress = ammeterInfo . FocusAddress ,
Pn = ammeterInfo . MeteringCode ,
2025-04-27 09:40:31 +08:00
ItemCode = itemCode ,
2025-04-25 12:01:15 +08:00
SubProtocolRequest = new SubProtocolBuildRequest ( )
{
MeterAddress = ammeterInfo . AmmerterAddress ,
Password = ammeterInfo . Password ,
2025-04-27 09:40:31 +08:00
ItemCode = subItemCode ,
2025-04-25 12:01:15 +08:00
}
} ) ;
2025-04-27 17:27:04 +08:00
var meterReadingRecords = CreateAmmeterPacketInfo (
ammeterInfo : ammeterInfo ,
timestamps : currentTime . GetDateTimeOffset ( ) . ToUnixTimeNanoseconds ( ) ,
builderResponse : builderResponse ,
itemCode : itemCode ,
subItemCode : subItemCode ,
pendingCopyReadTime : currentTime ,
2025-04-29 10:02:10 +08:00
creationTime : currentTime ,
packetType : TelemetryPacketTypeEnum . AmmeterAutomaticVerificationTime ) ;
2025-04-25 12:01:15 +08:00
taskList . Add ( meterReadingRecords ) ;
if ( taskList = = null | | taskList . Count < = 0 )
{
_logger . LogError ( $"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-25 12:01:15 +08:00
}
2025-04-30 12:36:54 +08:00
return null ;
2025-04-25 12:01:15 +08:00
//todo 阀控记录入库,推送到新的服务
}
catch ( Exception )
{
throw ;
}
}
2025-04-27 17:27:04 +08:00
/// <summary>
/// 日冻结抄读
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task < List < MeterReadingTelemetryPacketInfo > > AmmeterScheduledGetAutomaticDayFreezeData ( int timeDensity , AmmeterInfo ammeterInfo , int groupIndex , DateTime timestamps )
2025-04-27 17:27:04 +08:00
{
var currentTime = DateTime . Now ;
string currentTimeStr = $"{currentTime:HH:mm:00}" ;
try
{
//判断是否是自动校时时间
if ( ! string . Equals ( currentTimeStr , _applicationOptions . AutomaticVerificationTime , StringComparison . CurrentCultureIgnoreCase ) )
{
_logger . LogInformation ( $"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-27 17:27:04 +08:00
}
List < MeterReadingTelemetryPacketInfo > taskList = new List < MeterReadingTelemetryPacketInfo > ( ) ;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService . GetProtocolServiceAsync ( ammeterInfo . BrandType ) ;
if ( protocolPlugin = = null )
{
_logger . LogError ( $"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-27 17:27:04 +08:00
}
2025-04-29 10:02:10 +08:00
2025-04-27 17:27:04 +08:00
foreach ( var item in DayFreezeCodes )
{
ProtocolBuildResponse builderResponse = await protocolPlugin . BuildAsync ( new ProtocolBuildRequest ( )
{
FocusAddress = ammeterInfo . FocusAddress ,
Pn = ammeterInfo . MeteringCode ,
ItemCode = item
} ) ;
var meterReadingRecords = CreateAmmeterPacketInfo (
ammeterInfo : ammeterInfo ,
timestamps : currentTime . GetDateTimeOffset ( ) . ToUnixTimeNanoseconds ( ) ,
builderResponse : builderResponse ,
itemCode : item ,
subItemCode : null ,
pendingCopyReadTime : currentTime ,
2025-04-29 10:02:10 +08:00
creationTime : currentTime ,
packetType : TelemetryPacketTypeEnum . AmmeterDayFreeze ) ;
2025-04-27 17:27:04 +08:00
taskList . Add ( meterReadingRecords ) ;
}
2025-04-29 10:02:10 +08:00
2025-04-27 17:27:04 +08:00
if ( taskList = = null | | taskList . Count < = 0 )
{
_logger . LogError ( $"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-27 17:27:04 +08:00
}
2025-04-30 12:36:54 +08:00
return taskList ;
2025-04-29 10:02:10 +08:00
2025-04-27 17:27:04 +08:00
}
catch ( Exception )
{
throw ;
}
}
/// <summary>
/// 月冻结数据抄读
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task < List < MeterReadingTelemetryPacketInfo > > AmmeterScheduledGetAutomaticMonthFreezeData ( int timeDensity , AmmeterInfo ammeterInfo , int groupIndex , DateTime timestamps )
2025-04-27 17:27:04 +08:00
{
var currentTime = DateTime . Now ;
string currentTimeStr = $"{currentTime:HH:mm:00}" ;
try
{
//判断是否是自动校时时间
if ( ! string . Equals ( currentTimeStr , _applicationOptions . AutomaticVerificationTime , StringComparison . CurrentCultureIgnoreCase ) )
{
_logger . LogInformation ( $"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-27 17:27:04 +08:00
}
List < MeterReadingTelemetryPacketInfo > taskList = new List < MeterReadingTelemetryPacketInfo > ( ) ;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService . GetProtocolServiceAsync ( ammeterInfo . BrandType ) ;
if ( protocolPlugin = = null )
{
_logger . LogError ( $"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-27 17:27:04 +08:00
}
foreach ( var item in DayFreezeCodes )
{
ProtocolBuildResponse builderResponse = await protocolPlugin . BuildAsync ( new ProtocolBuildRequest ( )
{
FocusAddress = ammeterInfo . FocusAddress ,
Pn = ammeterInfo . MeteringCode ,
ItemCode = item
} ) ;
var meterReadingRecords = CreateAmmeterPacketInfo (
ammeterInfo : ammeterInfo ,
timestamps : currentTime . GetDateTimeOffset ( ) . ToUnixTimeNanoseconds ( ) ,
builderResponse : builderResponse ,
itemCode : item ,
subItemCode : null ,
pendingCopyReadTime : currentTime ,
2025-04-29 10:02:10 +08:00
creationTime : currentTime ,
packetType : TelemetryPacketTypeEnum . AmmeterMonthFreeze ) ;
2025-04-27 17:27:04 +08:00
taskList . Add ( meterReadingRecords ) ;
}
if ( taskList = = null | | taskList . Count < = 0 )
{
_logger . LogError ( $"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-27 17:27:04 +08:00
}
2025-04-30 12:36:54 +08:00
return taskList ;
2025-04-27 17:27:04 +08:00
}
catch ( Exception )
{
throw ;
}
}
2025-04-29 09:16:48 +08:00
/// <summary>
/// 电表监控重试抄读任务
/// </summary>
/// <param name="retryReadingEnum"></param>
/// <returns></returns>
public virtual async Task AmmeterScheduledRetryReading ( RetryReadingEnum retryReadingEnum )
{
// 根据任务获取不同的锁
var tryLock = FreeRedisProvider . Instance . Lock ( retryReadingEnum . ToString ( ) , 10 ) ;
try
{
2025-04-29 23:48:47 +08:00
if ( tryLock ! = null )
2025-04-29 09:16:48 +08:00
{
// 轮询IotDB未成果下发电表数据
var conditions = new List < QueryCondition > ( ) ;
// 下次发布时间小于等于当前时间
conditions . Add ( new QueryCondition ( )
{
Field = "NextSendTime" ,
Operator = "<" ,
IsNumber = false ,
Value = DateTime . Now
} ) ;
// 重试次数少于3次
conditions . Add ( new QueryCondition ( )
{
Field = "SendNum" ,
Operator = "<" ,
IsNumber = true ,
Value = 3
} ) ;
//已发布的
conditions . Add ( new QueryCondition ( )
{
Field = "IsSend" ,
Operator = "=" ,
IsNumber = false ,
Value = true
} ) ;
// 未响应
conditions . Add ( new QueryCondition ( )
{
Field = "IsReceived" ,
Operator = "=" ,
IsNumber = false ,
Value = false
} ) ;
2025-04-30 12:36:54 +08:00
//await CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerRetryEventName, new IoTDBQueryOptions()
//{
// TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
// PageIndex = 1,
// PageSize = pageSize,
// Conditions = conditions,
//});
2025-04-29 09:16:48 +08:00
// 释放锁
tryLock . Unlock ( ) ;
}
2025-04-29 23:48:47 +08:00
2025-04-29 09:16:48 +08:00
}
2025-04-29 23:48:47 +08:00
catch ( Exception )
{
// 释放锁
tryLock . Unlock ( ) ;
throw ;
2025-04-29 09:16:48 +08:00
}
}
2025-03-18 15:58:37 +08:00
#endregion
#region 水 表 采 集 处 理
/// <summary>
/// 获取水表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task < List < WatermeterInfo > > GetWatermeterInfoList ( string gatherCode = "" )
{
throw new NotImplementedException ( $"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现" ) ;
}
/// <summary>
/// 初始化水表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitWatermeterCacheData ( string gatherCode = "" )
{
var meterInfos = await GetWatermeterInfoList ( gatherCode ) ;
if ( meterInfos = = null | | meterInfos . Count < = 0 )
{
throw new NullReferenceException ( $"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空" ) ;
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes ( ) ;
if ( gatherItemInfos = = null | | gatherItemInfos . Count < = 0 )
{
throw new NullReferenceException ( $"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空" ) ;
}
2025-03-14 14:24:38 +08:00
2025-04-23 11:13:59 +08:00
List < string > deviceIds = new List < string > ( ) ; //用于处理Kafka主题分区数据的分发和处理。
2025-04-23 09:42:09 +08:00
2025-03-18 15:58:37 +08:00
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos . GroupBy ( d = > d . TimeDensity ) ;
2025-04-29 23:48:47 +08:00
var currentTime = DateTime . Now ;
2025-04-25 09:28:20 +08:00
if ( _applicationOptions . FirstCollectionTime . HasValue = = false )
2025-04-23 09:42:09 +08:00
{
2025-04-29 23:48:47 +08:00
_applicationOptions . FirstCollectionTime = currentTime ;
2025-04-23 09:42:09 +08:00
}
//先处理采集频率任务缓存
foreach ( var item in meterInfoGroupByTimeDensity )
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel ( )
{
2025-04-29 23:48:47 +08:00
LastTaskTime = _applicationOptions . FirstCollectionTime . Value . CalculateNextCollectionTime ( item . Key ) ,
2025-04-30 12:36:54 +08:00
TimeDensity = item . Key ,
2025-04-23 09:42:09 +08:00
} ;
2025-04-29 23:48:47 +08:00
nextTask . NextTaskTime = nextTask . LastTaskTime . CalculateNextCollectionTime ( item . Key ) ; //使用首次采集时间作为下一次采集时间
2025-04-23 09:42:09 +08:00
//todo 首次采集时间节点到目前运行时间中漏采的时间点, 可以考虑使用IoTDB的存储, 利用时间序列处理。
var taskRedisCacheKey = string . Format ( RedisConst . CacheTasksToBeIssuedKey , SystemType , ServerTagName , MeterTypeEnum . WaterMeter , item . Key ) ;
await FreeRedisProvider . Instance . SetAsync ( taskRedisCacheKey , nextTask ) ;
}
2025-03-18 15:58:37 +08:00
foreach ( var itemTimeDensity in meterInfoGroupByTimeDensity )
{
2025-04-23 09:42:09 +08:00
List < WatermeterInfo > watermeterInfo = new List < WatermeterInfo > ( ) ;
2025-03-18 15:58:37 +08:00
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity . GroupBy ( x = > x . FocusAddress ) . ToList ( ) ;
foreach ( var item in meterInfoGroup )
{
if ( string . IsNullOrWhiteSpace ( item . Key ) )
{
continue ;
2025-03-14 17:28:58 +08:00
}
2025-03-14 14:24:38 +08:00
2025-04-23 09:42:09 +08:00
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}" ;
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}" ;
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}" ;
2025-03-18 15:58:37 +08:00
foreach ( var subItem in item )
{
2025-04-23 11:13:59 +08:00
deviceIds . Add ( subItem . MeterId . ToString ( ) ) ;
2025-04-23 09:42:09 +08:00
watermeterInfo . Add ( subItem ) ;
2025-03-18 15:58:37 +08:00
}
2025-04-23 09:42:09 +08:00
await _redisDataCacheService . BatchInsertDataAsync < WatermeterInfo > (
redisCacheMeterInfoHashKey ,
redisCacheMeterInfoSetIndexKey ,
redisCacheMeterInfoZSetScoresIndexKey , watermeterInfo ) ;
2025-03-14 14:24:38 +08:00
}
2025-04-23 09:42:09 +08:00
}
2025-03-18 15:58:37 +08:00
2025-04-23 09:42:09 +08:00
//初始化设备组负载控制
2025-04-23 11:13:59 +08:00
if ( deviceIds = = null | | deviceIds . Count < = 0 )
2025-04-23 09:42:09 +08:00
{
_logger . LogError ( $"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息" ) ;
2025-03-18 15:58:37 +08:00
2025-04-23 09:42:09 +08:00
}
else
{
2025-04-23 11:13:59 +08:00
DeviceGroupBalanceControl . InitializeCache ( deviceIds , _kafkaOptions . NumPartitions ) ;
2025-03-17 11:34:30 +08:00
}
2025-04-18 17:46:24 +08:00
2025-03-18 15:58:37 +08:00
_logger . LogInformation ( $"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成" ) ;
2025-03-14 14:24:38 +08:00
}
2025-03-18 15:58:37 +08:00
/// <summary>
2025-04-14 16:41:41 +08:00
/// 水表数据采集
2025-03-18 15:58:37 +08:00
/// </summary>
/// <returns></returns>
2025-04-29 10:02:10 +08:00
public virtual async Task WatermeterScheduledMeterAutoReadding ( )
2025-03-18 15:58:37 +08:00
{
//获取缓存中的水表信息
2025-04-23 09:42:09 +08:00
int timeDensity = 60 ; //水表目前只有一个采集频率 60分钟
var redisCacheKey = string . Format ( RedisConst . CacheTasksToBeIssuedKey , SystemType , ServerTagName , MeterTypeEnum . WaterMeter , timeDensity ) ;
var taskInfo = await FreeRedisProvider . Instance . GetAsync < TasksToBeIssueModel > ( redisCacheKey ) ;
2025-03-18 15:58:37 +08:00
2025-04-29 23:48:47 +08:00
if ( taskInfo = = null )
2025-03-18 15:58:37 +08:00
{
2025-04-23 09:42:09 +08:00
_logger . LogError ( $"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败, 请检查Redis中是否有对应的任务下发信息" ) ;
2025-03-18 15:58:37 +08:00
return ;
}
2025-04-29 23:48:47 +08:00
var pendingCopyReadTime = taskInfo . LastTaskTime . GetDateTimeOffset ( ) . ToUnixTimeNanoseconds ( ) ;
2025-03-18 15:58:37 +08:00
2025-04-23 09:42:09 +08:00
var conditions = new List < QueryCondition > ( ) ;
conditions . Add ( new QueryCondition ( )
2025-03-18 15:58:37 +08:00
{
2025-04-23 09:42:09 +08:00
Field = "PendingCopyReadTime" ,
Operator = "=" ,
IsNumber = true ,
Value = pendingCopyReadTime
} ) ;
2025-04-17 15:49:57 +08:00
2025-04-30 12:36:54 +08:00
//_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions()
//{
// TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
// PageIndex = 1,
// PageSize = pageSize,
// Conditions = conditions,
//});
2025-04-01 22:50:34 +08:00
2025-04-29 10:02:10 +08:00
_logger . LogInformation ( $"{nameof(WatermeterScheduledMeterAutoReadding)} {timeDensity}分钟采集水表数据处理完成" ) ;
2025-04-23 09:42:09 +08:00
}
2025-04-01 22:50:34 +08:00
2025-04-23 09:42:09 +08:00
/// <summary>
/// 创建水表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="watermeter">水表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">时间格式的任务批次名称</param>
/// <returns></returns>
2025-04-24 17:48:20 +08:00
private async Task < List < MeterReadingTelemetryPacketInfo > > WatermeterCreatePublishTaskAction ( int timeDensity
2025-04-23 09:42:09 +08:00
, WatermeterInfo watermeter , int groupIndex , DateTime timestamps )
{
var currentTime = DateTime . Now ;
2025-03-18 22:43:24 +08:00
2025-04-23 09:42:09 +08:00
string typeName ;
if ( watermeter . MeterType = = MeterTypeEnum . WaterMeter )
{
timeDensity = watermeter . TimeDensity ; //水表默认为60分钟
typeName = watermeter . LinkType ;
if ( watermeter . MeterBrand . Contains ( "泉高阀门" ) | | watermeter . MeterBrand . Equals ( "LXSY-山水翔" ) )
{
typeName = watermeter . MeterBrand ;
2025-03-18 15:58:37 +08:00
}
}
2025-04-23 09:42:09 +08:00
else if ( watermeter . MeterType = = MeterTypeEnum . WaterMeterFlowmeter )
2025-03-18 15:58:37 +08:00
{
2025-04-23 09:42:09 +08:00
typeName = watermeter . MeterBrand ;
}
else
{
_logger . LogError ( $"{nameof(WatermeterCreatePublishTaskAction)} 水表类型错误:{watermeter.Serialize()}" ) ;
return null ;
2025-03-18 15:58:37 +08:00
}
2025-04-23 09:42:09 +08:00
List < MeterReadingTelemetryPacketInfo > taskList = new List < MeterReadingTelemetryPacketInfo > ( ) ;
2025-03-18 15:58:37 +08:00
2025-04-27 17:27:04 +08:00
2025-04-27 09:40:31 +08:00
//根据表型号获取协议插件
var protocolPlugin = await _protocolService . GetProtocolServiceAsync ( watermeter . Code ) ;
if ( protocolPlugin = = null )
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
//return;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
2025-04-27 09:40:31 +08:00
string itemCode = T37612012PacketItemCodeConst . AFN10HFN01H ;
string subItemCode = T1882018PacketItemCodeConst . CTR0190 ;
ProtocolBuildResponse builderResponse = await protocolPlugin . BuildAsync ( new ProtocolBuildRequest ( )
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
FocusAddress = watermeter . FocusAddress ,
Pn = watermeter . MeteringCode ,
ItemCode = itemCode ,
SubProtocolRequest = new SubProtocolBuildRequest ( )
{
MeterAddress = watermeter . MeterAddress ,
Password = watermeter . Password ,
ItemCode = subItemCode ,
}
} ) ;
if ( builderResponse = = null | | builderResponse . Data . Length < = 0 )
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
return null ;
2025-04-23 09:42:09 +08:00
}
2025-04-27 09:40:31 +08:00
if ( builderResponse = = null | | builderResponse . Data . Length < = 0 )
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
_logger . LogWarning ( $"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{itemCode}未能正确获取报文。" ) ;
return null ;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
2025-04-27 09:40:31 +08:00
string taskMark = CommonHelper . GetTaskMark ( builderResponse . AFn , builderResponse . Fn , watermeter . MeteringCode , builderResponse . MSA , builderResponse . Seq ) ;
var meterReadingRecords = new MeterReadingTelemetryPacketInfo ( )
2025-04-23 16:17:29 +08:00
{
2025-04-27 09:40:31 +08:00
SystemName = SystemType ,
ProjectId = $"{watermeter.ProjectID}" ,
DeviceType = $"{MeterTypeEnum.Ammeter}" ,
DeviceId = $"{watermeter.MeterId}" ,
Timestamps = DateTimeOffset . Now . ToUnixTimeNanoseconds ( ) ,
DatabaseBusiID = watermeter . DatabaseBusiID ,
2025-04-29 10:02:10 +08:00
PacketType = ( int ) TelemetryPacketTypeEnum . WatermeterAutoReadding ,
2025-04-29 23:48:47 +08:00
PendingCopyReadTime = timestamps ,
2025-04-27 09:40:31 +08:00
CreationTime = currentTime ,
MeterAddress = watermeter . MeterAddress ,
AFN = builderResponse . AFn ,
Fn = builderResponse . Fn ,
Seq = builderResponse . Seq ,
MSA = builderResponse . MSA ,
ItemCode = itemCode ,
SubItemCode = subItemCode ,
TaskMark = taskMark ,
IsSend = false ,
ManualOrNot = false ,
Pn = watermeter . MeteringCode ,
IssuedMessageId = GuidGenerator . Create ( ) . ToString ( ) ,
IssuedMessageHexString = Convert . ToHexString ( builderResponse . Data ) ,
IsReceived = false ,
ScoreValue = $"{watermeter.FocusAddress}.{taskMark}" . Md5Fun ( ) ,
} ;
taskList . Add ( meterReadingRecords ) ;
2025-04-18 17:46:24 +08:00
2025-04-27 09:40:31 +08:00
return taskList ;
}
#endregion
#region 集 中 器 处 理
/// <summary>
/// 自动获取终端版
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task < List < MeterReadingTelemetryPacketInfo > > ConcentratorScheduledAutomaticGetTerminalVersion ( int timeDensity , AmmeterInfo ammeterInfo , int groupIndex , DateTime timestamps )
2025-04-27 09:40:31 +08:00
{
var currentTime = DateTime . Now ;
string currentTimeStr = $"{currentTime:HH:mm:00}" ;
try
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
//判断是否是自动获取版本号时间
2025-04-27 17:27:04 +08:00
if ( ! string . Equals ( currentTimeStr , _applicationOptions . AutomaticTerminalVersionTime , StringComparison . CurrentCultureIgnoreCase ) )
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
_logger . LogInformation ( $"{nameof(ConcentratorScheduledAutomaticGetTerminalVersion)} 集中器自动获取版本号,非自动处理时间" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
2025-04-27 09:40:31 +08:00
List < MeterReadingTelemetryPacketInfo > taskList = new List < MeterReadingTelemetryPacketInfo > ( ) ;
var metadata = await _dbProvider . GetMetadata < MeterReadingTelemetryPacketInfo > ( ) ;
var itemCode = T37612012PacketItemCodeConst . AFN09HFN01H ;
//var subItemCode = T6452007PacketItemCodeConst.C08;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService . GetProtocolServiceAsync ( ammeterInfo . BrandType ) ;
if ( protocolPlugin = = null )
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
_logger . LogError ( $"{nameof(AmmeterScheduledAutoValveControl)} 集中器自动获取版本号{currentTime}没有找到对应的协议组件,-105" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
2025-04-24 23:39:39 +08:00
ProtocolBuildResponse builderResponse = await protocolPlugin . BuildAsync ( new ProtocolBuildRequest ( )
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
FocusAddress = ammeterInfo . FocusAddress ,
Pn = ammeterInfo . MeteringCode ,
ItemCode = itemCode ,
//SubProtocolRequest = new SubProtocolBuildRequest()
//{
// MeterAddress = ammeterInfo.AmmerterAddress,
// Password = ammeterInfo.Password,
// ItemCode = subItemCode,
//}
} ) ;
2025-04-27 17:27:04 +08:00
var meterReadingRecords = CreateAmmeterPacketInfo (
ammeterInfo : ammeterInfo ,
timestamps : currentTime . GetDateTimeOffset ( ) . ToUnixTimeNanoseconds ( ) ,
builderResponse : builderResponse ,
itemCode : itemCode ,
subItemCode : null ,
pendingCopyReadTime : currentTime ,
2025-04-29 10:02:10 +08:00
creationTime : currentTime ,
packetType : TelemetryPacketTypeEnum . TerminalVersion ) ;
2025-04-27 09:40:31 +08:00
taskList . Add ( meterReadingRecords ) ;
if ( taskList = = null | | taskList . Count < = 0 )
{
_logger . LogError ( $"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-27 09:40:31 +08:00
}
2025-04-30 12:36:54 +08:00
return taskList ;
2025-04-27 09:40:31 +08:00
}
catch ( Exception )
{
throw ;
}
}
/// <summary>
/// 自动获取远程通信模块(SIM)版本信息
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task < List < MeterReadingTelemetryPacketInfo > > ConcentratorScheduledAutomaticGetTelematicsModule ( int timeDensity , AmmeterInfo ammeterInfo , int groupIndex , DateTime timestamps )
2025-04-27 09:40:31 +08:00
{
var currentTime = DateTime . Now ;
string currentTimeStr = $"{currentTime:HH:mm:00}" ;
try
{
//判断是否是自动获取版本号时间
2025-04-27 17:27:04 +08:00
if ( ! string . Equals ( currentTimeStr , _applicationOptions . AutomaticTerminalVersionTime , StringComparison . CurrentCultureIgnoreCase ) )
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
_logger . LogInformation ( $"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息,非自动处理时间" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
2025-04-27 09:40:31 +08:00
List < MeterReadingTelemetryPacketInfo > taskList = new List < MeterReadingTelemetryPacketInfo > ( ) ;
var itemCode = T37612012PacketItemCodeConst . AFN09HFN09H ;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService . GetProtocolServiceAsync ( ammeterInfo . BrandType ) ;
if ( protocolPlugin = = null )
2025-04-23 09:42:09 +08:00
{
2025-04-27 09:40:31 +08:00
_logger . LogError ( $"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息{currentTime}没有找到对应的协议组件,-105" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
2025-04-27 09:40:31 +08:00
ProtocolBuildResponse builderResponse = await protocolPlugin . BuildAsync ( new ProtocolBuildRequest ( )
{
FocusAddress = ammeterInfo . FocusAddress ,
Pn = ammeterInfo . MeteringCode ,
ItemCode = itemCode ,
} ) ;
2025-04-18 17:46:24 +08:00
2025-04-27 17:27:04 +08:00
var meterReadingRecords = CreateAmmeterPacketInfo (
ammeterInfo : ammeterInfo ,
timestamps : currentTime . GetDateTimeOffset ( ) . ToUnixTimeNanoseconds ( ) ,
builderResponse : builderResponse ,
itemCode : itemCode ,
subItemCode : null ,
pendingCopyReadTime : currentTime ,
2025-04-29 10:02:10 +08:00
creationTime : currentTime ,
packetType : TelemetryPacketTypeEnum . TelematicsModule ) ;
2025-04-23 09:42:09 +08:00
taskList . Add ( meterReadingRecords ) ;
2025-04-18 17:46:24 +08:00
2025-04-27 09:40:31 +08:00
if ( taskList = = null | | taskList . Count < = 0 )
{
_logger . LogError ( $"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106" ) ;
2025-04-30 12:36:54 +08:00
return null ;
2025-04-27 09:40:31 +08:00
}
2025-04-21 22:57:49 +08:00
2025-04-30 12:36:54 +08:00
return taskList ;
2025-04-27 09:40:31 +08:00
}
catch ( Exception )
{
throw ;
}
}
2025-03-18 15:58:37 +08:00
#endregion
#region 公 共 处 理 方 法
2025-04-14 21:56:24 +08:00
2025-03-18 15:58:37 +08:00
/// <summary>
2025-04-14 16:41:41 +08:00
/// 判断是否需要生成采集指令
2025-03-18 15:58:37 +08:00
/// </summary>
2025-04-14 16:41:41 +08:00
/// <param name="nextTaskTime"></param>
/// <param name="timeDensity"></param>
2025-03-18 15:58:37 +08:00
/// <returns></returns>
2025-04-23 16:17:29 +08:00
protected bool IsTaskTime ( DateTime nextTaskTime , int timeDensity = 0 )
2025-03-18 15:58:37 +08:00
{
2025-04-14 16:41:41 +08:00
if ( DateTime . Now . AddMinutes ( timeDensity ) > = nextTaskTime )
2025-03-18 15:58:37 +08:00
{
2025-04-14 16:41:41 +08:00
return true ;
2025-03-18 15:58:37 +08:00
}
2025-04-14 16:41:41 +08:00
return false ;
2025-03-18 15:58:37 +08:00
}
2025-03-18 22:43:24 +08:00
2025-04-18 17:46:24 +08:00
/// <summary>
/// 创建表的待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
2025-04-21 22:57:49 +08:00
/// <param name="nextTaskTime">采集频率对应的任务时间戳</param>
2025-04-18 17:46:24 +08:00
/// <param name="meterType">表类型</param>
/// <param name="taskCreateAction">具体的创建任务的委托</param>
/// <returns></returns>
2025-04-23 16:17:29 +08:00
protected async Task CreateMeterPublishTask < T > ( int timeDensity , DateTime nextTaskTime , MeterTypeEnum meterType , Action < int , T , int , DateTime > taskCreateAction ) where T : DeviceCacheBasicModel
2025-04-18 17:46:24 +08:00
{
var timer = Stopwatch . StartNew ( ) ;
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}" ;
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}" ;
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}" ;
List < T > meterInfos = new List < T > ( ) ;
decimal? cursor = null ;
string member = null ;
2025-04-30 15:57:14 +08:00
while ( true )
2025-04-23 09:42:09 +08:00
{
var page = await _redisDataCacheService . GetAllPagedData < T > (
redisCacheMeterInfoHashKeyTemp ,
redisCacheMeterInfoZSetScoresIndexKeyTemp ,
pageSize : 1000 ,
lastScore : cursor ,
lastMember : member ) ;
meterInfos . AddRange ( page . Items ) ;
2025-04-30 15:57:14 +08:00
if ( ! page . HasNext )
{
break ;
}
cursor = page . NextScore ;
member = page . NextMember ;
}
2025-04-23 09:42:09 +08:00
//var page = await _redisDataCacheService.GetAllPagedData<T>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 10,
// lastScore: cursor,
// lastMember: member);
//meterInfos.AddRange(page.Items);
2025-04-18 17:46:24 +08:00
if ( meterInfos = = null | | meterInfos . Count < = 0 )
{
timer . Stop ( ) ;
2025-04-23 09:42:09 +08:00
_logger . LogError ( $"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105" ) ;
2025-04-18 17:46:24 +08:00
return ;
}
2025-04-30 15:57:14 +08:00
timer . Stop ( ) ;
2025-04-30 12:36:54 +08:00
_logger . LogError ( $"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒" ) ;
2025-04-18 17:46:24 +08:00
2025-04-30 15:57:14 +08:00
timer . Restart ( ) ;
2025-04-18 17:46:24 +08:00
await DeviceGroupBalanceControl . ProcessWithThrottleAsync (
items : meterInfos ,
2025-04-29 23:48:47 +08:00
deviceIdSelector : data = > data . MeterId . ToString ( ) ,
2025-04-18 17:46:24 +08:00
processor : ( data , groupIndex ) = >
{
2025-04-21 22:57:49 +08:00
taskCreateAction ( timeDensity , data , groupIndex , nextTaskTime ) ;
2025-04-18 17:46:24 +08:00
}
) ;
timer . Stop ( ) ;
2025-04-29 23:48:47 +08:00
_logger . LogError ( $"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息" ) ;
2025-04-18 17:46:24 +08:00
}
/// <summary>
/// 创建Kafka消息
/// </summary>
2025-04-23 16:17:29 +08:00
/// <typeparam name="T"></typeparam>
/// <param name="kafkaTopicName">kafka主题名称</param>
/// <param name="options">任务查询条件</param>
2025-04-18 17:46:24 +08:00
/// <returns></returns>
2025-04-23 16:17:29 +08:00
protected async Task CreateMeterKafkaTaskMessage < T > ( string kafkaTopicName , IoTDBQueryOptions options ) where T : IoTEntity , new ( )
2025-04-18 17:46:24 +08:00
{
2025-04-23 16:17:29 +08:00
if ( string . IsNullOrWhiteSpace ( kafkaTopicName ) )
{
_logger . LogInformation ( $"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空, -101" ) ;
return ;
}
2025-04-29 23:48:47 +08:00
int pageNumber = 103 ;
2025-04-18 17:46:24 +08:00
bool hasNext ;
var stopwatch = Stopwatch . StartNew ( ) ;
2025-04-24 17:48:20 +08:00
2025-04-22 16:44:47 +08:00
do
{
2025-04-29 23:48:47 +08:00
var stopwatch2 = Stopwatch . StartNew ( ) ;
2025-04-22 16:44:47 +08:00
options . PageIndex = pageNumber + + ;
2025-04-21 22:57:49 +08:00
2025-04-22 16:44:47 +08:00
var pageResult = await _dbProvider . QueryAsync < T > ( options ) ;
2025-04-21 22:57:49 +08:00
2025-04-22 16:44:47 +08:00
hasNext = pageResult . HasNext ;
2025-04-21 22:57:49 +08:00
2025-04-29 23:48:47 +08:00
_ = DeviceGroupBalanceControl . ProcessWithThrottleAsync < T > (
2025-04-22 16:44:47 +08:00
items : pageResult . Items . ToList ( ) ,
deviceIdSelector : data = > data . DeviceId ,
processor : ( data , groupIndex ) = >
{
2025-04-29 09:16:48 +08:00
_ = KafkaProducerIssuedMessageAction ( kafkaTopicName , data , groupIndex ) ;
2025-04-22 16:44:47 +08:00
}
) ;
2025-04-29 23:48:47 +08:00
stopwatch2 . Stop ( ) ;
_logger . LogWarning ( $"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。" ) ;
2025-04-22 16:44:47 +08:00
} while ( hasNext ) ;
2025-04-18 17:46:24 +08:00
stopwatch . Stop ( ) ;
2025-04-29 23:48:47 +08:00
_logger . LogWarning ( $"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。" ) ;
2025-04-18 17:46:24 +08:00
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
2025-04-23 16:17:29 +08:00
protected async Task KafkaProducerIssuedMessageAction < T > ( string topicName ,
2025-04-22 16:44:47 +08:00
T taskRecord , int partition ) where T : class
2025-04-18 17:46:24 +08:00
{
if ( string . IsNullOrWhiteSpace ( topicName ) | | taskRecord = = null )
{
throw new Exception ( $"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101" ) ;
}
2025-04-30 12:36:54 +08:00
// await _dataChannelManage.ProduceAsync<T>(topicName, taskRecord, partition);
2025-04-18 17:46:24 +08:00
}
2025-04-27 17:27:04 +08:00
/// <summary>
/// 构建报文保存对象
/// </summary>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="timestamps">IoTDB存储时标</param>
/// <param name="builderResponse">报文构建返回结果</param>
/// <param name="itemCode">端到云协议采集项编码</param>
/// <param name="subItemCode">端到端采集项编码</param>
/// <param name="pendingCopyReadTime">待采集时间,定时采集频率才是特殊情况,其他默认当前时间戳</param>
/// <param name="creationTime">数据创建时间戳</param>
2025-04-29 10:02:10 +08:00
/// <param name="packetType">数据包类型</param>
2025-04-27 17:27:04 +08:00
/// <returns></returns>
2025-04-29 10:02:10 +08:00
protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo ( AmmeterInfo ammeterInfo , long timestamps , ProtocolBuildResponse builderResponse , string itemCode , string subItemCode , DateTime pendingCopyReadTime , DateTime creationTime , TelemetryPacketTypeEnum packetType )
2025-04-27 17:27:04 +08:00
{
string taskMark = CommonHelper . GetTaskMark ( builderResponse . AFn , builderResponse . Fn , ammeterInfo . MeteringCode , builderResponse . MSA , builderResponse . Seq ) ;
return new MeterReadingTelemetryPacketInfo ( )
{
SystemName = SystemType ,
ProjectId = $"{ammeterInfo.ProjectID}" ,
DeviceType = $"{MeterTypeEnum.Ammeter}" ,
DeviceId = $"{ammeterInfo.MeterId}" ,
Timestamps = timestamps ,
DatabaseBusiID = ammeterInfo . DatabaseBusiID ,
PendingCopyReadTime = pendingCopyReadTime ,
CreationTime = creationTime ,
MeterAddress = ammeterInfo . AmmerterAddress ,
2025-04-29 10:02:10 +08:00
PacketType = ( int ) packetType ,
2025-04-27 17:27:04 +08:00
AFN = builderResponse . AFn ,
Fn = builderResponse . Fn ,
Seq = builderResponse . Seq ,
MSA = builderResponse . MSA ,
2025-04-29 10:02:10 +08:00
FocusId = ammeterInfo . FocusId ,
FocusAddress = ammeterInfo . FocusAddress ,
2025-04-27 17:27:04 +08:00
ItemCode = itemCode ,
SubItemCode = subItemCode ,
TaskMark = taskMark ,
IsSend = false ,
ManualOrNot = false ,
Pn = ammeterInfo . MeteringCode ,
IssuedMessageId = GuidGenerator . Create ( ) . ToString ( ) ,
IssuedMessageHexString = Convert . ToHexString ( builderResponse . Data ) ,
IsReceived = false ,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}" . Md5Fun ( ) ,
} ;
}
2025-03-14 14:24:38 +08:00
#endregion
2025-04-09 23:11:36 +08:00
2025-03-14 14:24:38 +08:00
}
}