2025-04-23 16:17:29 +08:00
|
|
|
|
using JiShe.CollectBus.Application.Contracts;
|
2025-04-25 09:28:20 +08:00
|
|
|
|
using JiShe.CollectBus.Common;
|
2025-03-14 14:24:38 +08:00
|
|
|
|
using JiShe.CollectBus.Common.Consts;
|
2025-04-14 21:56:24 +08:00
|
|
|
|
using JiShe.CollectBus.Common.DeviceBalanceControl;
|
2025-04-22 22:11:55 +08:00
|
|
|
|
using JiShe.CollectBus.Common.Encrypt;
|
2025-03-14 14:24:38 +08:00
|
|
|
|
using JiShe.CollectBus.Common.Enums;
|
2025-03-14 17:28:58 +08:00
|
|
|
|
using JiShe.CollectBus.Common.Extensions;
|
2025-03-14 14:24:38 +08:00
|
|
|
|
using JiShe.CollectBus.Common.Helpers;
|
2025-04-16 17:36:46 +08:00
|
|
|
|
using JiShe.CollectBus.Common.Models;
|
2025-03-14 14:24:38 +08:00
|
|
|
|
using JiShe.CollectBus.GatherItem;
|
2025-04-21 22:57:49 +08:00
|
|
|
|
using JiShe.CollectBus.IoTDB.Context;
|
2025-04-18 11:31:23 +08:00
|
|
|
|
using JiShe.CollectBus.IoTDB.Interface;
|
2025-04-22 16:44:47 +08:00
|
|
|
|
using JiShe.CollectBus.IoTDB.Model;
|
|
|
|
|
|
using JiShe.CollectBus.IoTDB.Options;
|
|
|
|
|
|
using JiShe.CollectBus.IoTDB.Provider;
|
2025-04-23 16:17:29 +08:00
|
|
|
|
using JiShe.CollectBus.IotSystems.Ammeters;
|
2025-03-18 22:43:24 +08:00
|
|
|
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
2025-03-14 14:38:08 +08:00
|
|
|
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
2025-04-19 14:00:13 +08:00
|
|
|
|
using JiShe.CollectBus.Kafka.Internal;
|
2025-04-15 15:49:51 +08:00
|
|
|
|
using JiShe.CollectBus.Kafka.Producer;
|
2025-04-24 23:39:39 +08:00
|
|
|
|
using JiShe.CollectBus.Protocol.Contracts.Models;
|
2025-04-24 17:48:20 +08:00
|
|
|
|
using JiShe.CollectBus.Protocol.Contracts.SendData;
|
2025-04-25 14:23:06 +08:00
|
|
|
|
using JiShe.CollectBus.Protocol.Interfaces;
|
2025-03-14 14:24:38 +08:00
|
|
|
|
using Microsoft.Extensions.Logging;
|
2025-04-17 13:54:18 +08:00
|
|
|
|
using Microsoft.Extensions.Options;
|
2025-04-09 23:11:36 +08:00
|
|
|
|
using System;
|
|
|
|
|
|
using System.Collections.Generic;
|
|
|
|
|
|
using System.Diagnostics;
|
|
|
|
|
|
using System.Linq;
|
|
|
|
|
|
using System.Threading.Tasks;
|
2025-03-14 14:24:38 +08:00
|
|
|
|
|
2025-03-14 14:38:08 +08:00
|
|
|
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
2025-03-14 14:24:38 +08:00
|
|
|
|
{
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 定时采集服务
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
|
|
|
|
|
|
{
|
|
|
|
|
|
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
|
2025-04-21 10:17:40 +08:00
|
|
|
|
private readonly IIoTDbProvider _dbProvider;
|
2025-04-15 15:57:14 +08:00
|
|
|
|
private readonly IProducerService _producerService;
|
2025-04-16 17:36:46 +08:00
|
|
|
|
private readonly IRedisDataCacheService _redisDataCacheService;
|
2025-04-17 13:54:18 +08:00
|
|
|
|
private readonly KafkaOptionConfig _kafkaOptions;
|
2025-04-25 09:28:20 +08:00
|
|
|
|
private readonly ServerApplicationOptions _applicationOptions;
|
2025-04-21 22:57:49 +08:00
|
|
|
|
private readonly IoTDBRuntimeContext _runtimeContext;
|
2025-04-24 17:48:20 +08:00
|
|
|
|
private readonly IProtocolService _protocolService;
|
|
|
|
|
|
|
2025-04-23 16:17:29 +08:00
|
|
|
|
int pageSize = 3000;
|
2025-04-14 16:41:41 +08:00
|
|
|
|
|
2025-03-17 14:23:48 +08:00
|
|
|
|
public BasicScheduledMeterReadingService(
|
|
|
|
|
|
ILogger<BasicScheduledMeterReadingService> logger,
|
2025-04-15 15:57:14 +08:00
|
|
|
|
IProducerService producerService,
|
2025-04-16 17:36:46 +08:00
|
|
|
|
IRedisDataCacheService redisDataCacheService,
|
2025-04-21 10:17:40 +08:00
|
|
|
|
IIoTDbProvider dbProvider,
|
2025-04-21 22:57:49 +08:00
|
|
|
|
IoTDBRuntimeContext runtimeContext,
|
2025-04-24 17:48:20 +08:00
|
|
|
|
IProtocolService protocolService,
|
2025-04-25 09:28:20 +08:00
|
|
|
|
IOptions<KafkaOptionConfig> kafkaOptions,
|
|
|
|
|
|
IOptions<ServerApplicationOptions> applicationOptions)
|
2025-03-14 14:24:38 +08:00
|
|
|
|
{
|
|
|
|
|
|
_logger = logger;
|
2025-04-08 17:44:42 +08:00
|
|
|
|
_dbProvider = dbProvider;
|
2025-04-21 22:57:49 +08:00
|
|
|
|
_runtimeContext = runtimeContext;
|
2025-04-15 15:49:51 +08:00
|
|
|
|
_producerService = producerService;
|
2025-04-16 17:36:46 +08:00
|
|
|
|
_redisDataCacheService = redisDataCacheService;
|
2025-04-17 13:54:18 +08:00
|
|
|
|
_kafkaOptions = kafkaOptions.Value;
|
2025-04-25 09:28:20 +08:00
|
|
|
|
_applicationOptions = applicationOptions.Value;
|
2025-04-24 17:48:20 +08:00
|
|
|
|
_protocolService = protocolService;
|
2025-04-22 17:58:14 +08:00
|
|
|
|
|
|
|
|
|
|
_runtimeContext.UseTableSessionPool = true;
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 系统类型
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public abstract string SystemType { get; }
|
|
|
|
|
|
|
2025-04-09 17:29:30 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 应用服务器部署标记
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public abstract string ServerTagName { get; }
|
|
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
///电表日冻结采集项
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
protected List<string> DayFreezeCodes = new List<string>() { "0D_3", "0D_4", "0D_161", "0D_162", "0D_163", "0D_164", "0D_165", "0D_166", "0D_167", "0D_168", "0C_149", };
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 电表月冻结采集项
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
protected List<string> MonthFreezeCodes = new List<string>() { "0D_177", "0D_178", "0D_179", "0D_180", "0D_181", "0D_182", "0D_183", "0D_184", "0D_193", "0D_195", };
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 获取采集项列表
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual Task<List<GatherItemInfo>> GetGatherItemByDataTypes()
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new NotImplementedException($"{nameof(GetGatherItemByDataTypes)}请根据不同系统类型进行实现");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 构建待处理的下发指令任务处理
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual async Task CreateToBeIssueTasks()
|
2025-04-14 21:56:24 +08:00
|
|
|
|
{
|
2025-04-09 23:11:36 +08:00
|
|
|
|
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
|
2025-03-18 15:58:37 +08:00
|
|
|
|
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
|
|
|
|
|
|
if (taskInfos == null || taskInfos.Length <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时没有缓存数据,-101");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-18 17:46:24 +08:00
|
|
|
|
var currentTime = DateTime.Now;
|
2025-04-25 09:28:20 +08:00
|
|
|
|
|
|
|
|
|
|
//定时抄读
|
2025-03-18 15:58:37 +08:00
|
|
|
|
foreach (var item in taskInfos)
|
|
|
|
|
|
{
|
|
|
|
|
|
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(item);
|
|
|
|
|
|
if (tasksToBeIssueModel == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102");
|
|
|
|
|
|
continue;
|
2025-04-14 21:56:24 +08:00
|
|
|
|
}
|
2025-04-14 16:41:41 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>ServerTagName,tempArryay[3]=>TaskInfo,tempArryay[4]=>表计类别,tempArryay[5]=>采集频率
|
2025-04-14 16:41:41 +08:00
|
|
|
|
var tempArryay = item.Split(":");
|
2025-04-14 23:42:18 +08:00
|
|
|
|
string meteryType = tempArryay[4];//表计类别
|
|
|
|
|
|
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
|
2025-04-15 09:43:51 +08:00
|
|
|
|
if (timeDensity > 15)
|
2025-04-14 23:42:18 +08:00
|
|
|
|
{
|
|
|
|
|
|
timeDensity = 15;
|
|
|
|
|
|
}
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
2025-04-25 12:01:15 +08:00
|
|
|
|
|
|
|
|
|
|
//电表定时广播校时,一天一次。
|
|
|
|
|
|
string currentTimeStr = $"{currentTime:HH:mm:00}";
|
|
|
|
|
|
//判断是否是自动校时时间
|
|
|
|
|
|
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
_ = CreateMeterPublishTask<AmmeterInfo>(
|
|
|
|
|
|
timeDensity: timeDensity,
|
|
|
|
|
|
nextTaskTime: currentTime,
|
|
|
|
|
|
meterType: MeterTypeEnum.Ammeter,
|
|
|
|
|
|
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
|
|
|
|
|
|
{
|
|
|
|
|
|
await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
|
|
|
|
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-14 10:20:48 +08:00
|
|
|
|
//检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过
|
2025-04-14 16:41:41 +08:00
|
|
|
|
if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity))
|
2025-04-14 10:20:48 +08:00
|
|
|
|
{
|
2025-04-14 16:41:41 +08:00
|
|
|
|
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
|
2025-04-14 10:20:48 +08:00
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-04-17 15:49:57 +08:00
|
|
|
|
|
2025-04-14 16:41:41 +08:00
|
|
|
|
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
|
|
|
|
|
|
|
2025-04-22 17:58:14 +08:00
|
|
|
|
var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候,NextTaskTime已经格式化到下一个采集点时间。
|
2025-04-14 16:41:41 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
2025-03-18 15:58:37 +08:00
|
|
|
|
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
|
|
|
|
|
{
|
2025-04-22 16:44:47 +08:00
|
|
|
|
_ = CreateMeterPublishTask<AmmeterInfo>(
|
2025-04-18 17:46:24 +08:00
|
|
|
|
timeDensity: timeDensity,
|
2025-04-22 17:58:14 +08:00
|
|
|
|
nextTaskTime: currentTaskTime,
|
2025-04-18 17:46:24 +08:00
|
|
|
|
meterType: MeterTypeEnum.Ammeter,
|
2025-04-24 17:48:20 +08:00
|
|
|
|
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
|
2025-04-15 09:43:51 +08:00
|
|
|
|
{
|
2025-04-24 17:48:20 +08:00
|
|
|
|
var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
|
2025-04-21 22:57:49 +08:00
|
|
|
|
if (tempTask == null || tempTask.Count <= 0)
|
|
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return;
|
|
|
|
|
|
}
|
2025-04-24 17:48:20 +08:00
|
|
|
|
_ = _dbProvider.BatchInsertAsync(metadata, tempTask);
|
2025-04-18 17:46:24 +08:00
|
|
|
|
});
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
|
|
|
|
|
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
|
|
|
|
|
|
{
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
|
|
|
|
|
_ = CreateMeterPublishTask<WatermeterInfo>(
|
|
|
|
|
|
timeDensity: timeDensity,
|
2025-04-22 17:58:14 +08:00
|
|
|
|
nextTaskTime: currentTaskTime,
|
2025-04-23 11:13:59 +08:00
|
|
|
|
meterType: MeterTypeEnum.WaterMeter,
|
2025-04-24 17:48:20 +08:00
|
|
|
|
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
|
2025-04-18 17:46:24 +08:00
|
|
|
|
{
|
2025-04-24 17:48:20 +08:00
|
|
|
|
var tempTask = await WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
|
2025-04-23 09:42:09 +08:00
|
|
|
|
|
|
|
|
|
|
if (tempTask == null || tempTask.Count <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
2025-04-24 17:48:20 +08:00
|
|
|
|
_ = _dbProvider.BatchInsertAsync(metadata, tempTask);
|
2025-04-18 17:46:24 +08:00
|
|
|
|
});
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
2025-04-14 10:20:48 +08:00
|
|
|
|
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-106");
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
|
|
|
|
|
|
|
2025-04-14 16:41:41 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-04-13 21:26:27 +08:00
|
|
|
|
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
|
2025-04-22 17:58:14 +08:00
|
|
|
|
tasksToBeIssueModel.LastTaskTime = currentTaskTime;
|
|
|
|
|
|
tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
|
2025-04-25 12:01:15 +08:00
|
|
|
|
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
2025-04-25 09:28:20 +08:00
|
|
|
|
|
|
|
|
|
|
//电表定时阀控任务处理。
|
2025-04-25 12:01:15 +08:00
|
|
|
|
_= AmmeterScheduledAutoValveControl();
|
|
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
#region 电表采集处理
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 获取电表信息
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="gatherCode">采集端Code</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "")
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 初始化电表缓存数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="gatherCode">采集端Code</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
|
|
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
//此处代码不要删除
|
|
|
|
|
|
//#if DEBUG
|
|
|
|
|
|
// var timeDensity = "15";
|
|
|
|
|
|
// var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
|
|
|
|
|
// var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
|
|
|
|
|
// var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
|
|
|
|
|
|
|
|
|
|
|
// List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
|
|
|
|
|
|
// List<string> focusAddressDataLista = new List<string>();
|
|
|
|
|
|
// var timer1 = Stopwatch.StartNew();
|
|
|
|
|
|
|
|
|
|
|
|
// var allIds = new HashSet<string>();
|
|
|
|
|
|
// decimal? score = null;
|
|
|
|
|
|
// string member = null;
|
|
|
|
|
|
|
|
|
|
|
|
// while (true)
|
|
|
|
|
|
// {
|
|
|
|
|
|
// var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
|
|
|
|
|
|
// redisCacheMeterInfoHashKeyTemp,
|
|
|
|
|
|
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
|
|
|
|
|
// pageSize: 1000,
|
|
|
|
|
|
// lastScore: score,
|
|
|
|
|
|
// lastMember: member);
|
|
|
|
|
|
|
|
|
|
|
|
// meterInfos.AddRange(page.Items);
|
|
|
|
|
|
// focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
|
|
|
|
|
|
// foreach (var item in page.Items)
|
|
|
|
|
|
// {
|
|
|
|
|
|
// if (!allIds.Add(item.MemberId))
|
|
|
|
|
|
// {
|
|
|
|
|
|
// _logger.LogError($"{item.MemberId}Duplicate data found!");
|
|
|
|
|
|
// }
|
|
|
|
|
|
// }
|
|
|
|
|
|
// if (!page.HasNext) break;
|
|
|
|
|
|
// score = page.NextScore;
|
|
|
|
|
|
// member = page.NextMember;
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// timer1.Stop();
|
|
|
|
|
|
// _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
|
|
|
|
|
|
// DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
|
|
|
|
|
|
// return;
|
|
|
|
|
|
//#else
|
|
|
|
|
|
// var meterInfos = await GetAmmeterInfoList(gatherCode);
|
|
|
|
|
|
//#endif
|
2025-03-14 14:24:38 +08:00
|
|
|
|
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
|
|
|
|
|
if (meterInfos == null || meterInfos.Count <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//获取采集项类型数据
|
|
|
|
|
|
var gatherItemInfos = await GetGatherItemByDataTypes();
|
|
|
|
|
|
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
|
|
|
|
|
|
}
|
2025-04-15 17:40:17 +08:00
|
|
|
|
var timer = Stopwatch.StartNew();
|
2025-03-14 14:24:38 +08:00
|
|
|
|
|
2025-04-23 11:13:59 +08:00
|
|
|
|
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
|
2025-04-09 23:11:36 +08:00
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
//根据采集频率分组,获得采集频率分组
|
|
|
|
|
|
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-25 09:28:20 +08:00
|
|
|
|
if (_applicationOptions.FirstCollectionTime.HasValue == false)
|
2025-04-21 22:57:49 +08:00
|
|
|
|
{
|
2025-04-25 09:28:20 +08:00
|
|
|
|
_applicationOptions.FirstCollectionTime = DateTime.Now;
|
2025-04-21 22:57:49 +08:00
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
//先处理采集频率任务缓存
|
|
|
|
|
|
foreach (var item in meterInfoGroupByTimeDensity)
|
|
|
|
|
|
{
|
|
|
|
|
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
|
|
|
|
|
{
|
2025-04-22 16:44:47 +08:00
|
|
|
|
LastTaskTime = null,
|
2025-04-18 17:46:24 +08:00
|
|
|
|
TimeDensity = item.Key,
|
2025-04-25 09:28:20 +08:00
|
|
|
|
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
|
2025-04-18 17:46:24 +08:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
//todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
|
|
|
|
|
|
|
|
|
|
|
|
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
|
|
|
|
|
|
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
|
|
|
|
|
|
{
|
2025-04-16 17:36:46 +08:00
|
|
|
|
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
|
|
|
|
|
|
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
|
2025-04-17 15:49:57 +08:00
|
|
|
|
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
|
2025-04-15 17:40:17 +08:00
|
|
|
|
|
|
|
|
|
|
List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
|
2025-03-14 14:24:38 +08:00
|
|
|
|
//将表计信息根据集中器分组,获得集中器号
|
|
|
|
|
|
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
|
|
|
|
|
|
foreach (var item in meterInfoGroup)
|
|
|
|
|
|
{
|
2025-03-18 15:58:37 +08:00
|
|
|
|
if (string.IsNullOrWhiteSpace(item.Key))//集中器号为空,跳过
|
2025-03-14 14:24:38 +08:00
|
|
|
|
{
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-04-23 16:17:29 +08:00
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
foreach (var ammeter in item)
|
|
|
|
|
|
{
|
2025-04-23 11:13:59 +08:00
|
|
|
|
deviceIds.Add(ammeter.MeterId.ToString());
|
|
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
//处理ItemCode
|
2025-03-27 08:38:19 +08:00
|
|
|
|
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
|
2025-03-14 14:24:38 +08:00
|
|
|
|
{
|
|
|
|
|
|
var itemArr = ammeter.DataTypes.Split(',').ToList();
|
|
|
|
|
|
|
|
|
|
|
|
#region 拼接采集项
|
|
|
|
|
|
List<string> itemCodeList = new List<string>();
|
|
|
|
|
|
foreach (var dataType in itemArr)
|
|
|
|
|
|
{
|
2025-03-17 11:34:30 +08:00
|
|
|
|
var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表
|
2025-03-14 14:24:38 +08:00
|
|
|
|
var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
|
|
|
|
|
|
if (gatherItem != null)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (!excludeItemCode.Contains(gatherItem.ItemCode))
|
|
|
|
|
|
{
|
|
|
|
|
|
itemCodeList.Add(gatherItem.ItemCode);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-17 11:34:30 +08:00
|
|
|
|
#region 特殊电表采集项编号处理
|
|
|
|
|
|
if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS
|
|
|
|
|
|
{
|
|
|
|
|
|
itemCodeList.Add("10_95");
|
|
|
|
|
|
}
|
|
|
|
|
|
if (itemArr.Exists(e => e.Equals("109")))//WAVE_109
|
|
|
|
|
|
{
|
|
|
|
|
|
itemCodeList.Add("10_109");
|
|
|
|
|
|
}
|
|
|
|
|
|
#endregion
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
#endregion
|
|
|
|
|
|
|
2025-03-17 11:34:30 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
|
|
|
|
|
|
|
|
|
|
|
|
if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
|
|
|
|
|
|
{
|
|
|
|
|
|
ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-15 17:40:17 +08:00
|
|
|
|
ammeterInfos.Add(ammeter);
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
2025-04-16 17:36:46 +08:00
|
|
|
|
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
|
|
|
|
|
|
redisCacheMeterInfoHashKey,
|
|
|
|
|
|
redisCacheMeterInfoSetIndexKey,
|
2025-04-17 15:49:57 +08:00
|
|
|
|
redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos);
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-10 14:12:14 +08:00
|
|
|
|
//初始化设备组负载控制
|
2025-04-23 11:13:59 +08:00
|
|
|
|
if (deviceIds == null || deviceIds.Count <= 0)
|
2025-04-10 14:12:14 +08:00
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
2025-04-23 11:13:59 +08:00
|
|
|
|
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
|
2025-04-10 14:12:14 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-15 17:40:17 +08:00
|
|
|
|
timer.Stop();
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// 1分钟采集电表数据,只获取任务数据下发,不构建任务
|
2025-03-14 14:24:38 +08:00
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
|
|
|
|
|
|
{
|
2025-04-18 17:46:24 +08:00
|
|
|
|
int timeDensity = 5;
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
|
|
|
|
|
|
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
|
2025-03-14 14:24:38 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
|
2025-03-14 14:24:38 +08:00
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
2025-04-01 22:50:34 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
2025-03-18 22:43:24 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var conditions = new List<QueryCondition>();
|
|
|
|
|
|
conditions.Add(new QueryCondition()
|
|
|
|
|
|
{
|
|
|
|
|
|
Field = "PendingCopyReadTime",
|
|
|
|
|
|
Operator = "=",
|
|
|
|
|
|
IsNumber = true,
|
|
|
|
|
|
Value = pendingCopyReadTime
|
|
|
|
|
|
});
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
2025-04-23 16:17:29 +08:00
|
|
|
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, new IoTDBQueryOptions()
|
2025-04-23 09:42:09 +08:00
|
|
|
|
{
|
|
|
|
|
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
|
|
|
|
|
PageIndex = 1,
|
2025-04-23 16:17:29 +08:00
|
|
|
|
PageSize = pageSize,
|
2025-04-23 09:42:09 +08:00
|
|
|
|
Conditions = conditions,
|
|
|
|
|
|
});
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// 5分钟采集电表数据
|
2025-03-14 14:24:38 +08:00
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
2025-03-18 15:58:37 +08:00
|
|
|
|
public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
|
2025-03-14 14:24:38 +08:00
|
|
|
|
{
|
2025-03-18 15:58:37 +08:00
|
|
|
|
int timeDensity = 5;
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
|
|
|
|
|
|
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
|
2025-03-17 11:34:30 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
|
2025-03-14 14:24:38 +08:00
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
2025-03-18 22:43:24 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
2025-03-14 14:24:38 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var conditions = new List<QueryCondition>();
|
|
|
|
|
|
conditions.Add(new QueryCondition()
|
|
|
|
|
|
{
|
|
|
|
|
|
Field = "PendingCopyReadTime",
|
|
|
|
|
|
Operator = "=",
|
|
|
|
|
|
IsNumber = true,
|
|
|
|
|
|
Value = pendingCopyReadTime
|
|
|
|
|
|
});
|
2025-03-14 14:24:38 +08:00
|
|
|
|
|
2025-04-23 16:17:29 +08:00
|
|
|
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, new IoTDBQueryOptions()
|
2025-04-23 09:42:09 +08:00
|
|
|
|
{
|
|
|
|
|
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
|
|
|
|
|
PageIndex = 1,
|
2025-04-23 16:17:29 +08:00
|
|
|
|
PageSize = pageSize,
|
2025-04-23 09:42:09 +08:00
|
|
|
|
Conditions = conditions,
|
|
|
|
|
|
});
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// 15分钟采集电表数据
|
2025-03-14 14:24:38 +08:00
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
2025-03-18 15:58:37 +08:00
|
|
|
|
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
|
2025-03-14 14:24:38 +08:00
|
|
|
|
{
|
2025-03-18 15:58:37 +08:00
|
|
|
|
int timeDensity = 15;
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
|
2025-04-22 16:44:47 +08:00
|
|
|
|
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
|
2025-03-20 16:40:27 +08:00
|
|
|
|
|
2025-04-22 16:44:47 +08:00
|
|
|
|
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
2025-03-14 14:24:38 +08:00
|
|
|
|
|
2025-04-22 16:44:47 +08:00
|
|
|
|
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-22 16:44:47 +08:00
|
|
|
|
var conditions = new List<QueryCondition>();
|
|
|
|
|
|
conditions.Add(new QueryCondition()
|
2025-03-14 14:24:38 +08:00
|
|
|
|
{
|
2025-04-22 16:44:47 +08:00
|
|
|
|
Field = "PendingCopyReadTime",
|
|
|
|
|
|
Operator = "=",
|
|
|
|
|
|
IsNumber = true,
|
|
|
|
|
|
Value = pendingCopyReadTime
|
2025-04-17 17:23:20 +08:00
|
|
|
|
});
|
2025-03-20 16:40:27 +08:00
|
|
|
|
|
2025-04-23 16:17:29 +08:00
|
|
|
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, new IoTDBQueryOptions()
|
2025-04-22 16:44:47 +08:00
|
|
|
|
{
|
|
|
|
|
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
|
|
|
|
|
PageIndex = 1,
|
2025-04-23 16:17:29 +08:00
|
|
|
|
PageSize = pageSize,
|
2025-04-22 16:44:47 +08:00
|
|
|
|
Conditions = conditions,
|
2025-04-23 09:42:09 +08:00
|
|
|
|
});
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
2025-04-18 17:46:24 +08:00
|
|
|
|
/// 创建电表待发送的任务数据
|
2025-03-14 14:24:38 +08:00
|
|
|
|
/// </summary>
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// <param name="timeDensity">采集频率</param>
|
2025-04-18 17:46:24 +08:00
|
|
|
|
/// <param name="ammeterInfo">电表信息</param>
|
2025-04-17 11:29:26 +08:00
|
|
|
|
/// <param name="groupIndex">集中器所在分组</param>
|
2025-04-21 22:57:49 +08:00
|
|
|
|
/// <param name="timestamps">采集频率对应的时间戳</param>
|
2025-03-14 14:24:38 +08:00
|
|
|
|
/// <returns></returns>
|
2025-04-24 17:48:20 +08:00
|
|
|
|
private async Task<List<MeterReadingTelemetryPacketInfo>> AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
|
2025-04-14 17:38:34 +08:00
|
|
|
|
{
|
|
|
|
|
|
var currentTime = DateTime.Now;
|
2025-04-18 11:31:23 +08:00
|
|
|
|
|
2025-04-24 17:48:20 +08:00
|
|
|
|
//根据电表型号获取协议插件
|
|
|
|
|
|
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
|
|
|
|
|
|
if (protocolPlugin == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
|
|
|
|
|
|
//return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-04-21 22:57:49 +08:00
|
|
|
|
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
2025-04-15 09:43:51 +08:00
|
|
|
|
|
2025-04-14 21:56:24 +08:00
|
|
|
|
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
|
|
|
|
|
|
{
|
2025-04-23 09:45:21 +08:00
|
|
|
|
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return null;
|
2025-04-14 21:56:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//载波的不处理
|
|
|
|
|
|
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
|
|
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return null;
|
2025-04-14 21:56:24 +08:00
|
|
|
|
}
|
2025-04-14 17:38:34 +08:00
|
|
|
|
|
2025-04-14 21:56:24 +08:00
|
|
|
|
if (ammeterInfo.State.Equals(2))
|
2025-04-14 17:38:34 +08:00
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return null;
|
2025-04-14 21:56:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
|
|
|
|
|
|
//if (!IsGennerateCmd(ammeter.LastTime, -1))
|
|
|
|
|
|
//{
|
|
|
|
|
|
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime},已超过1天未在线,不生成指令");
|
|
|
|
|
|
// continue;
|
|
|
|
|
|
//}
|
2025-04-14 17:38:34 +08:00
|
|
|
|
|
2025-04-14 21:56:24 +08:00
|
|
|
|
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
|
|
|
|
|
|
{
|
2025-04-23 09:45:21 +08:00
|
|
|
|
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空");
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return null;
|
2025-04-14 21:56:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
|
|
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空");
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return null;
|
2025-04-14 21:56:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
|
|
|
|
|
|
{
|
2025-04-23 09:45:21 +08:00
|
|
|
|
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535");
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return null;
|
2025-04-14 21:56:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
|
|
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})");
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return null;
|
2025-04-14 21:56:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
|
|
|
|
|
|
|
|
|
|
|
|
//TODO:自动上报数据只主动采集1类数据。
|
|
|
|
|
|
if (ammeterInfo.AutomaticReport.Equals(1))
|
|
|
|
|
|
{
|
|
|
|
|
|
var tempSubCodes = new List<string>();
|
|
|
|
|
|
if (tempCodes.Contains("0C_49"))
|
2025-04-14 17:38:34 +08:00
|
|
|
|
{
|
2025-04-14 21:56:24 +08:00
|
|
|
|
tempSubCodes.Add("0C_49");
|
2025-04-14 17:38:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-14 21:56:24 +08:00
|
|
|
|
if (tempSubCodes.Contains("0C_149"))
|
2025-04-14 17:38:34 +08:00
|
|
|
|
{
|
2025-04-14 21:56:24 +08:00
|
|
|
|
tempSubCodes.Add("0C_149");
|
2025-04-14 17:38:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-14 21:56:24 +08:00
|
|
|
|
if (ammeterInfo.ItemCodes.Contains("10_97"))
|
2025-04-14 17:38:34 +08:00
|
|
|
|
{
|
2025-04-14 21:56:24 +08:00
|
|
|
|
tempSubCodes.Add("10_97");
|
2025-04-14 17:38:34 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-14 21:56:24 +08:00
|
|
|
|
if (tempSubCodes == null || tempSubCodes.Count <= 0)
|
2025-04-14 17:38:34 +08:00
|
|
|
|
{
|
2025-04-15 15:49:51 +08:00
|
|
|
|
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return null;
|
2025-04-14 17:38:34 +08:00
|
|
|
|
}
|
2025-04-14 21:56:24 +08:00
|
|
|
|
else
|
2025-04-14 17:38:34 +08:00
|
|
|
|
{
|
2025-04-14 21:56:24 +08:00
|
|
|
|
tempCodes = tempSubCodes;
|
2025-04-14 17:38:34 +08:00
|
|
|
|
}
|
2025-04-14 21:56:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-17 11:29:26 +08:00
|
|
|
|
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
2025-04-14 21:56:24 +08:00
|
|
|
|
|
|
|
|
|
|
foreach (var tempItem in tempCodes)
|
|
|
|
|
|
{
|
|
|
|
|
|
//排除已发送日冻结和月冻结采集项配置
|
|
|
|
|
|
if (DayFreezeCodes.Contains(tempItem))
|
2025-04-14 17:38:34 +08:00
|
|
|
|
{
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-04-14 21:56:24 +08:00
|
|
|
|
|
|
|
|
|
|
if (MonthFreezeCodes.Contains(tempItem))
|
2025-04-14 17:38:34 +08:00
|
|
|
|
{
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-24 17:48:20 +08:00
|
|
|
|
//var itemCodeArr = tempItem.Split('_');
|
|
|
|
|
|
//var aFNStr = itemCodeArr[0];
|
|
|
|
|
|
//var aFN = (AFN)aFNStr.HexToDec();
|
|
|
|
|
|
//var fn = int.Parse(itemCodeArr[1]);
|
2025-04-14 17:38:34 +08:00
|
|
|
|
|
2025-04-24 17:48:20 +08:00
|
|
|
|
//TODO:特殊表
|
2025-04-24 23:39:39 +08:00
|
|
|
|
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
|
2025-04-24 17:48:20 +08:00
|
|
|
|
{
|
|
|
|
|
|
FocusAddress = ammeterInfo.FocusAddress,
|
|
|
|
|
|
Pn = ammeterInfo.MeteringCode,
|
|
|
|
|
|
ItemCode = tempItem,
|
|
|
|
|
|
});
|
2025-04-18 11:31:23 +08:00
|
|
|
|
if (builderResponse == null || builderResponse.Data.Length <= 0)
|
2025-04-14 21:56:24 +08:00
|
|
|
|
{
|
2025-04-15 15:49:51 +08:00
|
|
|
|
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
|
2025-04-14 21:56:24 +08:00
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-04-14 17:38:34 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-04-24 17:48:20 +08:00
|
|
|
|
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
|
2025-04-17 11:29:26 +08:00
|
|
|
|
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
2025-04-14 21:56:24 +08:00
|
|
|
|
{
|
2025-04-21 22:57:49 +08:00
|
|
|
|
SystemName = SystemType,
|
|
|
|
|
|
ProjectId = $"{ammeterInfo.ProjectID}",
|
|
|
|
|
|
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
2025-04-23 11:13:59 +08:00
|
|
|
|
DeviceId = $"{ammeterInfo.MeterId}",
|
2025-04-22 16:44:47 +08:00
|
|
|
|
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
|
2025-04-14 21:56:24 +08:00
|
|
|
|
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
2025-04-21 22:57:49 +08:00
|
|
|
|
PendingCopyReadTime = timestamps,
|
2025-04-14 21:56:24 +08:00
|
|
|
|
CreationTime = currentTime,
|
|
|
|
|
|
MeterAddress = ammeterInfo.AmmerterAddress,
|
2025-04-24 17:48:20 +08:00
|
|
|
|
AFN = builderResponse.AFn,
|
|
|
|
|
|
Fn = builderResponse.Fn,
|
|
|
|
|
|
Seq = builderResponse.Seq,
|
2025-04-18 11:31:23 +08:00
|
|
|
|
MSA = builderResponse.MSA,
|
2025-04-14 21:56:24 +08:00
|
|
|
|
ItemCode = tempItem,
|
2025-04-22 22:11:55 +08:00
|
|
|
|
TaskMark = taskMark,
|
2025-04-18 11:31:23 +08:00
|
|
|
|
IsSend = false,
|
2025-04-14 21:56:24 +08:00
|
|
|
|
ManualOrNot = false,
|
|
|
|
|
|
Pn = ammeterInfo.MeteringCode,
|
|
|
|
|
|
IssuedMessageId = GuidGenerator.Create().ToString(),
|
2025-04-18 11:31:23 +08:00
|
|
|
|
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
|
|
|
|
|
IsReceived = false,
|
2025-04-22 22:11:55 +08:00
|
|
|
|
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
2025-04-14 21:56:24 +08:00
|
|
|
|
};
|
|
|
|
|
|
|
2025-04-17 11:29:26 +08:00
|
|
|
|
taskList.Add(meterReadingRecords);
|
2025-04-14 17:38:34 +08:00
|
|
|
|
}
|
2025-04-15 09:43:51 +08:00
|
|
|
|
|
2025-04-21 22:57:49 +08:00
|
|
|
|
return taskList;
|
2025-04-15 15:49:51 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-23 16:17:29 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 获取电表阀控配置
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="currentTime">阀控的时间</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 电表自动阀控
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual Task AmmeterScheduledAutoValveControl()
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-25 12:01:15 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 电表自动校时
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="timeDensity">采集频率</param>
|
|
|
|
|
|
/// <param name="ammeterInfo">电表信息</param>
|
|
|
|
|
|
/// <param name="groupIndex">集中器所在分组</param>
|
|
|
|
|
|
/// <param name="timestamps">采集频率对应的时间戳</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual async Task AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
|
|
|
|
|
|
{
|
|
|
|
|
|
var currentTime = DateTime.Now;
|
|
|
|
|
|
string currentTimeStr = $"{currentTime:HH:mm:00}";
|
|
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
//判断是否是自动校时时间
|
|
|
|
|
|
if (!string.Equals(currentTimeStr , _applicationOptions.AutomaticVerificationTime,StringComparison.CurrentCultureIgnoreCase))
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
|
|
|
|
|
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
|
|
|
|
|
|
|
|
|
|
|
|
var temCode = "10_01";
|
|
|
|
|
|
|
|
|
|
|
|
//根据电表型号获取协议插件
|
|
|
|
|
|
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
|
|
|
|
|
|
if (protocolPlugin == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
|
|
|
|
|
|
{
|
|
|
|
|
|
FocusAddress = ammeterInfo.FocusAddress,
|
|
|
|
|
|
Pn = ammeterInfo.MeteringCode,
|
|
|
|
|
|
ItemCode = temCode,
|
|
|
|
|
|
SubProtocolRequest = new SubProtocolBuildRequest()
|
|
|
|
|
|
{
|
|
|
|
|
|
MeterAddress = ammeterInfo.AmmerterAddress,
|
|
|
|
|
|
Password = ammeterInfo.Password,
|
|
|
|
|
|
ItemCode = T645PacketItemCodeConst.C08,
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
|
|
|
|
|
|
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
|
|
|
|
|
{
|
|
|
|
|
|
SystemName = SystemType,
|
|
|
|
|
|
ProjectId = $"{ammeterInfo.ProjectID}",
|
|
|
|
|
|
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
|
|
|
|
|
DeviceId = $"{ammeterInfo.MeterId}",
|
|
|
|
|
|
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
|
|
|
|
|
|
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
|
|
|
|
|
PendingCopyReadTime = currentTime,
|
|
|
|
|
|
CreationTime = currentTime,
|
|
|
|
|
|
MeterAddress = ammeterInfo.AmmerterAddress,
|
|
|
|
|
|
AFN = builderResponse.AFn,
|
|
|
|
|
|
Fn = builderResponse.Fn,
|
|
|
|
|
|
Seq = builderResponse.Seq,
|
|
|
|
|
|
MSA = builderResponse.MSA,
|
|
|
|
|
|
ItemCode = temCode,
|
|
|
|
|
|
TaskMark = taskMark,
|
|
|
|
|
|
IsSend = false,
|
|
|
|
|
|
ManualOrNot = false,
|
|
|
|
|
|
Pn = ammeterInfo.MeteringCode,
|
|
|
|
|
|
IssuedMessageId = GuidGenerator.Create().ToString(),
|
|
|
|
|
|
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
|
|
|
|
|
IsReceived = false,
|
|
|
|
|
|
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
|
|
|
|
|
|
};
|
|
|
|
|
|
taskList.Add(meterReadingRecords);
|
|
|
|
|
|
|
|
|
|
|
|
if (taskList == null || taskList.Count <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//任务记录入库
|
|
|
|
|
|
await _dbProvider.BatchInsertAsync(metadata, taskList);
|
|
|
|
|
|
|
|
|
|
|
|
//任务信息推送Kafka
|
|
|
|
|
|
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
|
|
|
|
|
|
items: taskList,
|
|
|
|
|
|
deviceIdSelector: data => data.DeviceId,
|
|
|
|
|
|
processor: (data, groupIndex) =>
|
|
|
|
|
|
{
|
|
|
|
|
|
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, data, groupIndex);
|
|
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
//todo 阀控记录入库,推送到新的服务
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception)
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
throw;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
#endregion
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#region 水表采集处理
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 获取水表信息
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="gatherCode">采集端Code</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "")
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 初始化水表缓存数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="gatherCode">采集端Code</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public virtual async Task InitWatermeterCacheData(string gatherCode = "")
|
|
|
|
|
|
{
|
|
|
|
|
|
var meterInfos = await GetWatermeterInfoList(gatherCode);
|
|
|
|
|
|
if (meterInfos == null || meterInfos.Count <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//获取采集项类型数据
|
|
|
|
|
|
var gatherItemInfos = await GetGatherItemByDataTypes();
|
|
|
|
|
|
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
|
|
|
|
|
|
}
|
2025-03-14 14:24:38 +08:00
|
|
|
|
|
2025-04-23 11:13:59 +08:00
|
|
|
|
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
|
2025-04-23 09:42:09 +08:00
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
//根据采集频率分组,获得采集频率分组
|
|
|
|
|
|
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
|
2025-04-25 09:28:20 +08:00
|
|
|
|
if (_applicationOptions.FirstCollectionTime.HasValue == false)
|
2025-04-23 09:42:09 +08:00
|
|
|
|
{
|
2025-04-25 09:28:20 +08:00
|
|
|
|
_applicationOptions.FirstCollectionTime = DateTime.Now;
|
2025-04-23 09:42:09 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//先处理采集频率任务缓存
|
|
|
|
|
|
foreach (var item in meterInfoGroupByTimeDensity)
|
|
|
|
|
|
{
|
|
|
|
|
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
|
|
|
|
|
{
|
|
|
|
|
|
LastTaskTime = null,
|
|
|
|
|
|
TimeDensity = item.Key,
|
2025-04-25 09:28:20 +08:00
|
|
|
|
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
|
2025-04-23 09:42:09 +08:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
//todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。
|
|
|
|
|
|
|
|
|
|
|
|
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
|
|
|
|
|
|
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
|
|
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
List<WatermeterInfo> watermeterInfo = new List<WatermeterInfo>();
|
|
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
//将表计信息根据集中器分组,获得集中器号
|
|
|
|
|
|
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
|
|
|
|
|
|
foreach (var item in meterInfoGroup)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(item.Key))
|
|
|
|
|
|
{
|
|
|
|
|
|
continue;
|
2025-03-14 17:28:58 +08:00
|
|
|
|
}
|
2025-03-14 14:24:38 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
|
|
|
|
|
|
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
|
|
|
|
|
|
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
|
|
|
|
|
|
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
|
|
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
foreach (var subItem in item)
|
|
|
|
|
|
{
|
2025-04-23 11:13:59 +08:00
|
|
|
|
deviceIds.Add(subItem.MeterId.ToString());
|
|
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
watermeterInfo.Add(subItem);
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
2025-04-23 09:42:09 +08:00
|
|
|
|
|
|
|
|
|
|
await _redisDataCacheService.BatchInsertDataAsync<WatermeterInfo>(
|
|
|
|
|
|
redisCacheMeterInfoHashKey,
|
|
|
|
|
|
redisCacheMeterInfoSetIndexKey,
|
|
|
|
|
|
redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo);
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
2025-04-23 09:42:09 +08:00
|
|
|
|
}
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
//初始化设备组负载控制
|
2025-04-23 11:13:59 +08:00
|
|
|
|
if (deviceIds == null || deviceIds.Count <= 0)
|
2025-04-23 09:42:09 +08:00
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
2025-04-23 11:13:59 +08:00
|
|
|
|
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
|
2025-03-17 11:34:30 +08:00
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// <summary>
|
2025-04-14 16:41:41 +08:00
|
|
|
|
/// 水表数据采集
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <returns></returns>
|
2025-04-14 16:41:41 +08:00
|
|
|
|
public virtual async Task WatermeterScheduledMeterAutoReading()
|
2025-03-18 15:58:37 +08:00
|
|
|
|
{
|
|
|
|
|
|
//获取缓存中的水表信息
|
2025-04-23 09:42:09 +08:00
|
|
|
|
int timeDensity = 60;//水表目前只有一个采集频率 60分钟
|
|
|
|
|
|
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity);
|
|
|
|
|
|
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
|
2025-03-18 15:58:37 +08:00
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息");
|
2025-03-18 15:58:37 +08:00
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var conditions = new List<QueryCondition>();
|
|
|
|
|
|
conditions.Add(new QueryCondition()
|
2025-03-18 15:58:37 +08:00
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
Field = "PendingCopyReadTime",
|
|
|
|
|
|
Operator = "=",
|
|
|
|
|
|
IsNumber = true,
|
|
|
|
|
|
Value = pendingCopyReadTime
|
|
|
|
|
|
});
|
2025-04-17 15:49:57 +08:00
|
|
|
|
|
2025-04-23 16:17:29 +08:00
|
|
|
|
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions()
|
2025-04-23 09:42:09 +08:00
|
|
|
|
{
|
|
|
|
|
|
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
|
|
|
|
|
|
PageIndex = 1,
|
2025-04-23 16:17:29 +08:00
|
|
|
|
PageSize = pageSize,
|
2025-04-23 09:42:09 +08:00
|
|
|
|
Conditions = conditions,
|
|
|
|
|
|
});
|
2025-04-01 22:50:34 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
|
|
|
|
|
|
}
|
2025-04-01 22:50:34 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 创建水表待发送的任务数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="timeDensity">采集频率</param>
|
|
|
|
|
|
/// <param name="watermeter">水表信息</param>
|
|
|
|
|
|
/// <param name="groupIndex">集中器所在分组</param>
|
|
|
|
|
|
/// <param name="timestamps">时间格式的任务批次名称</param>
|
|
|
|
|
|
/// <returns></returns>
|
2025-04-24 17:48:20 +08:00
|
|
|
|
private async Task<List<MeterReadingTelemetryPacketInfo>> WatermeterCreatePublishTaskAction(int timeDensity
|
2025-04-23 09:42:09 +08:00
|
|
|
|
, WatermeterInfo watermeter, int groupIndex, DateTime timestamps)
|
|
|
|
|
|
{
|
|
|
|
|
|
var currentTime = DateTime.Now;
|
2025-03-18 22:43:24 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
string typeName;
|
|
|
|
|
|
if (watermeter.MeterType == MeterTypeEnum.WaterMeter)
|
|
|
|
|
|
{
|
|
|
|
|
|
timeDensity = watermeter.TimeDensity;//水表默认为60分钟
|
|
|
|
|
|
typeName = watermeter.LinkType;
|
|
|
|
|
|
if (watermeter.MeterBrand.Contains("泉高阀门") || watermeter.MeterBrand.Equals("LXSY-山水翔"))
|
|
|
|
|
|
{
|
|
|
|
|
|
typeName = watermeter.MeterBrand;
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-04-23 09:42:09 +08:00
|
|
|
|
else if (watermeter.MeterType == MeterTypeEnum.WaterMeterFlowmeter)
|
2025-03-18 15:58:37 +08:00
|
|
|
|
{
|
2025-04-23 09:42:09 +08:00
|
|
|
|
typeName = watermeter.MeterBrand;
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(WatermeterCreatePublishTaskAction)} 水表类型错误:{watermeter.Serialize()}");
|
|
|
|
|
|
return null;
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
List<string> tempCodes = new List<string>() { "10_1" };
|
2025-03-18 15:58:37 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
//todo 后续从协议池获取
|
|
|
|
|
|
if (watermeter.MeterTypeName.Equals("水表") && (watermeter.Protocol.Equals((int)MeterLinkProtocol.CJT_188_2018) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_1997) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_2007)))//水表且(CJT_188_2018或DLT_645_1997)都采用0C_129
|
|
|
|
|
|
{
|
|
|
|
|
|
if (watermeter.MeterBrand.Contains("炬华有线"))
|
|
|
|
|
|
{
|
|
|
|
|
|
tempCodes = new List<string>() { "0C_188" };
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
tempCodes = new List<string>() { "0C_129" };
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
else if (typeName.Trim().Equals("西恩超声波流量计"))
|
|
|
|
|
|
{
|
2025-04-23 09:45:21 +08:00
|
|
|
|
tempCodes = new List<string>() { "10_1" };
|
2025-04-23 09:42:09 +08:00
|
|
|
|
}
|
|
|
|
|
|
else if (typeName.Trim().Equals("江苏华海涡街流量计积算仪"))
|
|
|
|
|
|
{
|
|
|
|
|
|
tempCodes = new List<string>() { "10_1" };
|
|
|
|
|
|
}
|
|
|
|
|
|
else if (typeName.Trim().Equals("V880BR涡街流量计"))
|
|
|
|
|
|
{
|
|
|
|
|
|
tempCodes = new List<string>() { "10_1" };
|
|
|
|
|
|
}
|
|
|
|
|
|
else if (typeName.Trim().Equals("拓思特涡街流量计H880BR"))
|
|
|
|
|
|
{
|
|
|
|
|
|
tempCodes = new List<string>() { "10_1" };
|
|
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-24 23:39:39 +08:00
|
|
|
|
//根据表型号获取协议插件
|
2025-04-24 17:48:20 +08:00
|
|
|
|
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code);
|
2025-04-23 16:17:29 +08:00
|
|
|
|
if (protocolPlugin == null)
|
|
|
|
|
|
{
|
2025-04-24 17:48:20 +08:00
|
|
|
|
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
|
|
|
|
|
|
//return;
|
2025-04-23 16:17:29 +08:00
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
foreach (var tempItem in tempCodes)
|
|
|
|
|
|
{
|
|
|
|
|
|
//排除已发送日冻结和月冻结采集项配置
|
|
|
|
|
|
if (DayFreezeCodes.Contains(tempItem))
|
|
|
|
|
|
{
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
if (MonthFreezeCodes.Contains(tempItem))
|
|
|
|
|
|
{
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-24 17:48:20 +08:00
|
|
|
|
//var itemCodeArr = tempItem.Split('_');
|
|
|
|
|
|
//var aFNStr = itemCodeArr[0];
|
|
|
|
|
|
//var aFN = (AFN)aFNStr.HexToDec();
|
|
|
|
|
|
//var fn = int.Parse(itemCodeArr[1]);
|
|
|
|
|
|
//TelemetryPacketResponse builderResponse = null;
|
|
|
|
|
|
|
|
|
|
|
|
//string methonCode = $"AFN{aFNStr}_Fn_Send";
|
|
|
|
|
|
////特殊表暂不处理
|
|
|
|
|
|
//if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
|
|
|
|
|
|
// , out var handler))
|
|
|
|
|
|
//{
|
|
|
|
|
|
// builderResponse = handler(new TelemetryPacketRequest()
|
|
|
|
|
|
// {
|
|
|
|
|
|
// FocusAddress = watermeter.FocusAddress,
|
|
|
|
|
|
// Fn = fn,
|
|
|
|
|
|
// Pn = watermeter.MeteringCode,
|
|
|
|
|
|
// DataUnit = Build188SendData.Build188WaterMeterReadingSendDataUnit(watermeter.Address),
|
|
|
|
|
|
// });
|
|
|
|
|
|
//}
|
|
|
|
|
|
//else
|
|
|
|
|
|
//{
|
|
|
|
|
|
// _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}无效编码。");
|
|
|
|
|
|
// continue;
|
|
|
|
|
|
//}
|
|
|
|
|
|
|
2025-04-24 23:39:39 +08:00
|
|
|
|
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
|
2025-04-23 09:42:09 +08:00
|
|
|
|
{
|
2025-04-24 17:48:20 +08:00
|
|
|
|
FocusAddress = watermeter.FocusAddress,
|
|
|
|
|
|
Pn = watermeter.MeteringCode,
|
|
|
|
|
|
ItemCode = tempItem,
|
2025-04-24 23:39:39 +08:00
|
|
|
|
SubProtocolRequest = new SubProtocolBuildRequest()
|
|
|
|
|
|
{
|
|
|
|
|
|
MeterAddress = watermeter.MeterAddress,
|
|
|
|
|
|
Password = watermeter.Password,
|
|
|
|
|
|
ItemCode = tempItem,
|
|
|
|
|
|
}
|
2025-04-24 17:48:20 +08:00
|
|
|
|
});
|
|
|
|
|
|
if (builderResponse == null || builderResponse.Data.Length <= 0)
|
2025-04-23 09:42:09 +08:00
|
|
|
|
{
|
2025-04-24 17:48:20 +08:00
|
|
|
|
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
|
2025-04-23 09:42:09 +08:00
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
if (builderResponse == null || builderResponse.Data.Length <= 0)
|
|
|
|
|
|
{
|
2025-04-23 23:42:35 +08:00
|
|
|
|
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}未能正确获取报文。");
|
2025-04-23 09:42:09 +08:00
|
|
|
|
continue;
|
|
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-04-24 17:48:20 +08:00
|
|
|
|
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq);
|
2025-04-23 09:42:09 +08:00
|
|
|
|
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
|
|
|
|
|
{
|
|
|
|
|
|
SystemName = SystemType,
|
|
|
|
|
|
ProjectId = $"{watermeter.ProjectID}",
|
|
|
|
|
|
DeviceType = $"{MeterTypeEnum.Ammeter}",
|
2025-04-23 11:13:59 +08:00
|
|
|
|
DeviceId = $"{watermeter.MeterId}",
|
2025-04-23 09:42:09 +08:00
|
|
|
|
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
|
|
|
|
|
|
DatabaseBusiID = watermeter.DatabaseBusiID,
|
|
|
|
|
|
PendingCopyReadTime = timestamps,
|
|
|
|
|
|
CreationTime = currentTime,
|
|
|
|
|
|
MeterAddress = watermeter.MeterAddress,
|
2025-04-24 17:48:20 +08:00
|
|
|
|
AFN = builderResponse.AFn,
|
|
|
|
|
|
Fn = builderResponse.Fn,
|
|
|
|
|
|
Seq = builderResponse.Seq,
|
2025-04-23 09:42:09 +08:00
|
|
|
|
MSA = builderResponse.MSA,
|
|
|
|
|
|
ItemCode = tempItem,
|
|
|
|
|
|
TaskMark = taskMark,
|
|
|
|
|
|
IsSend = false,
|
|
|
|
|
|
ManualOrNot = false,
|
|
|
|
|
|
Pn = watermeter.MeteringCode,
|
|
|
|
|
|
IssuedMessageId = GuidGenerator.Create().ToString(),
|
|
|
|
|
|
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
|
|
|
|
|
|
IsReceived = false,
|
|
|
|
|
|
ScoreValue = $"{watermeter.FocusAddress}.{taskMark}".Md5Fun(),
|
|
|
|
|
|
};
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
taskList.Add(meterReadingRecords);
|
|
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
return taskList;
|
2025-04-21 22:57:49 +08:00
|
|
|
|
|
2025-04-23 09:42:09 +08:00
|
|
|
|
}
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
#endregion
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#region 公共处理方法
|
2025-04-14 21:56:24 +08:00
|
|
|
|
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// <summary>
|
2025-04-14 16:41:41 +08:00
|
|
|
|
/// 判断是否需要生成采集指令
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// </summary>
|
2025-04-14 16:41:41 +08:00
|
|
|
|
/// <param name="nextTaskTime"></param>
|
|
|
|
|
|
/// <param name="timeDensity"></param>
|
2025-03-18 15:58:37 +08:00
|
|
|
|
/// <returns></returns>
|
2025-04-23 16:17:29 +08:00
|
|
|
|
protected bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
|
2025-03-18 15:58:37 +08:00
|
|
|
|
{
|
2025-04-14 16:41:41 +08:00
|
|
|
|
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
|
2025-03-18 15:58:37 +08:00
|
|
|
|
{
|
2025-04-14 16:41:41 +08:00
|
|
|
|
return true;
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-14 16:41:41 +08:00
|
|
|
|
return false;
|
2025-03-18 15:58:37 +08:00
|
|
|
|
}
|
2025-03-18 22:43:24 +08:00
|
|
|
|
|
2025-04-18 17:46:24 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 创建表的待发送的任务数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="timeDensity">采集频率</param>
|
2025-04-21 22:57:49 +08:00
|
|
|
|
/// <param name="nextTaskTime">采集频率对应的任务时间戳</param>
|
2025-04-18 17:46:24 +08:00
|
|
|
|
/// <param name="meterType">表类型</param>
|
|
|
|
|
|
/// <param name="taskCreateAction">具体的创建任务的委托</param>
|
|
|
|
|
|
/// <returns></returns>
|
2025-04-23 16:17:29 +08:00
|
|
|
|
protected async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
|
2025-04-18 17:46:24 +08:00
|
|
|
|
{
|
|
|
|
|
|
var timer = Stopwatch.StartNew();
|
|
|
|
|
|
|
|
|
|
|
|
//获取对应频率中的所有电表信息
|
|
|
|
|
|
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}";
|
|
|
|
|
|
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
|
|
|
|
|
|
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
|
|
|
|
|
|
|
|
|
|
|
|
List<T> meterInfos = new List<T>();
|
|
|
|
|
|
decimal? cursor = null;
|
|
|
|
|
|
string member = null;
|
|
|
|
|
|
bool hasNext;
|
2025-04-23 09:42:09 +08:00
|
|
|
|
do
|
|
|
|
|
|
{
|
|
|
|
|
|
var page = await _redisDataCacheService.GetAllPagedData<T>(
|
|
|
|
|
|
redisCacheMeterInfoHashKeyTemp,
|
|
|
|
|
|
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
|
|
|
|
|
pageSize: 1000,
|
|
|
|
|
|
lastScore: cursor,
|
|
|
|
|
|
lastMember: member);
|
|
|
|
|
|
|
|
|
|
|
|
meterInfos.AddRange(page.Items);
|
|
|
|
|
|
cursor = page.HasNext ? page.NextScore : null;
|
|
|
|
|
|
member = page.HasNext ? page.NextMember : null;
|
|
|
|
|
|
hasNext = page.HasNext;
|
|
|
|
|
|
} while (hasNext);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//var page = await _redisDataCacheService.GetAllPagedData<T>(
|
|
|
|
|
|
// redisCacheMeterInfoHashKeyTemp,
|
|
|
|
|
|
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
|
|
|
|
|
// pageSize: 10,
|
|
|
|
|
|
// lastScore: cursor,
|
|
|
|
|
|
// lastMember: member);
|
|
|
|
|
|
//meterInfos.AddRange(page.Items);
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
|
|
|
|
|
if (meterInfos == null || meterInfos.Count <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
timer.Stop();
|
2025-04-23 09:42:09 +08:00
|
|
|
|
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
2025-04-18 17:46:24 +08:00
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
|
|
|
|
|
items: meterInfos,
|
|
|
|
|
|
deviceIdSelector: data => data.FocusAddress,
|
|
|
|
|
|
processor: (data, groupIndex) =>
|
|
|
|
|
|
{
|
2025-04-21 22:57:49 +08:00
|
|
|
|
taskCreateAction(timeDensity, data, groupIndex, nextTaskTime);
|
2025-04-18 17:46:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
timer.Stop();
|
2025-04-22 16:44:47 +08:00
|
|
|
|
_logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息");
|
2025-04-18 17:46:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 创建Kafka消息
|
|
|
|
|
|
/// </summary>
|
2025-04-23 16:17:29 +08:00
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="kafkaTopicName">kafka主题名称</param>
|
|
|
|
|
|
/// <param name="options">任务查询条件</param>
|
2025-04-18 17:46:24 +08:00
|
|
|
|
/// <returns></returns>
|
2025-04-23 16:17:29 +08:00
|
|
|
|
protected async Task CreateMeterKafkaTaskMessage<T>(string kafkaTopicName, IoTDBQueryOptions options) where T : IoTEntity, new()
|
2025-04-18 17:46:24 +08:00
|
|
|
|
{
|
2025-04-23 16:17:29 +08:00
|
|
|
|
if (string.IsNullOrWhiteSpace(kafkaTopicName))
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空,-101");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
2025-04-22 16:44:47 +08:00
|
|
|
|
int pageNumber = 0;
|
2025-04-18 17:46:24 +08:00
|
|
|
|
bool hasNext;
|
|
|
|
|
|
var stopwatch = Stopwatch.StartNew();
|
2025-04-24 17:48:20 +08:00
|
|
|
|
|
2025-04-22 16:44:47 +08:00
|
|
|
|
do
|
|
|
|
|
|
{
|
|
|
|
|
|
options.PageIndex = pageNumber++;
|
2025-04-21 22:57:49 +08:00
|
|
|
|
|
2025-04-22 16:44:47 +08:00
|
|
|
|
var pageResult = await _dbProvider.QueryAsync<T>(options);
|
2025-04-21 22:57:49 +08:00
|
|
|
|
|
2025-04-22 16:44:47 +08:00
|
|
|
|
hasNext = pageResult.HasNext;
|
2025-04-21 22:57:49 +08:00
|
|
|
|
|
2025-04-22 16:44:47 +08:00
|
|
|
|
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<T>(
|
|
|
|
|
|
items: pageResult.Items.ToList(),
|
|
|
|
|
|
deviceIdSelector: data => data.DeviceId,
|
|
|
|
|
|
processor: (data, groupIndex) =>
|
|
|
|
|
|
{
|
|
|
|
|
|
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
|
|
|
|
|
|
}
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
} while (hasNext);
|
2025-04-18 17:46:24 +08:00
|
|
|
|
|
|
|
|
|
|
stopwatch.Stop();
|
2025-04-23 16:17:29 +08:00
|
|
|
|
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
|
2025-04-18 17:46:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// Kafka 推送消息
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="topicName">主题名称</param>
|
|
|
|
|
|
/// <param name="taskRecord">任务记录</param>
|
|
|
|
|
|
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
|
|
|
|
|
|
/// <returns></returns>
|
2025-04-23 16:17:29 +08:00
|
|
|
|
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
|
2025-04-22 16:44:47 +08:00
|
|
|
|
T taskRecord, int partition) where T : class
|
2025-04-18 17:46:24 +08:00
|
|
|
|
{
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-22 16:44:47 +08:00
|
|
|
|
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
|
2025-04-18 17:46:24 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
#endregion
|
2025-04-09 23:11:36 +08:00
|
|
|
|
|
2025-03-14 14:24:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|