1699 lines
78 KiB
C#
Raw Normal View History

2025-04-23 16:17:29 +08:00
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Encrypt;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Common.Enums;
2025-03-14 17:28:58 +08:00
using JiShe.CollectBus.Common.Extensions;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Common.Helpers;
2025-04-16 17:36:46 +08:00
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.DataMigration.Options;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.GatherItem;
2025-04-18 11:31:23 +08:00
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
2025-04-23 16:17:29 +08:00
using JiShe.CollectBus.IotSystems.Ammeters;
2025-03-18 22:43:24 +08:00
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
2025-03-14 14:38:08 +08:00
using JiShe.CollectBus.IotSystems.Watermeter;
2025-04-19 14:00:13 +08:00
using JiShe.CollectBus.Kafka.Internal;
2025-04-25 14:23:06 +08:00
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Models;
2025-03-14 14:24:38 +08:00
using Microsoft.Extensions.Logging;
2025-04-17 13:54:18 +08:00
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
2025-03-14 14:24:38 +08:00
2025-03-14 14:38:08 +08:00
namespace JiShe.CollectBus.ScheduledMeterReading
2025-03-14 14:24:38 +08:00
{
/// <summary>
/// 定时采集服务
/// </summary>
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
2025-04-21 10:17:40 +08:00
private readonly IIoTDbProvider _dbProvider;
2025-04-30 12:36:54 +08:00
private readonly IDataChannelManageService _dataChannelManage;
2025-04-16 17:36:46 +08:00
private readonly IRedisDataCacheService _redisDataCacheService;
2025-04-24 17:48:20 +08:00
private readonly IProtocolService _protocolService;
2025-04-30 12:36:54 +08:00
private readonly DataMigrationOptions _dataMigrationOptions;
private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions;
2025-04-24 17:48:20 +08:00
int pageSize = 10000;
2025-03-17 14:23:48 +08:00
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
2025-04-30 12:36:54 +08:00
IDataChannelManageService dataChannelManage,
2025-04-16 17:36:46 +08:00
IRedisDataCacheService redisDataCacheService,
2025-04-21 10:17:40 +08:00
IIoTDbProvider dbProvider,
2025-04-24 17:48:20 +08:00
IProtocolService protocolService,
2025-04-30 12:36:54 +08:00
IOptions<DataMigrationOptions> dataMigrationOptions,
IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions)
2025-03-14 14:24:38 +08:00
{
_logger = logger;
2025-04-08 17:44:42 +08:00
_dbProvider = dbProvider;
2025-04-30 12:36:54 +08:00
_dataChannelManage = dataChannelManage;
2025-04-16 17:36:46 +08:00
_redisDataCacheService = redisDataCacheService;
2025-04-24 17:48:20 +08:00
_protocolService = protocolService;
2025-04-22 17:58:14 +08:00
2025-04-30 12:36:54 +08:00
_dataMigrationOptions = dataMigrationOptions.Value;
_kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value;
2025-04-30 12:36:54 +08:00
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 系统类型
/// </summary>
public abstract string SystemType { get; }
2025-04-09 17:29:30 +08:00
/// <summary>
/// 应用服务器部署标记
/// </summary>
public abstract string ServerTagName { get; }
2025-03-14 14:24:38 +08:00
/// <summary>
///电表日冻结采集项
/// </summary>
protected List<string> DayFreezeCodes = new List<string>() { "0D_3", "0D_4", "0D_161", "0D_162", "0D_163", "0D_164", "0D_165", "0D_166", "0D_167", "0D_168", "0C_149", };
/// <summary>
/// 电表月冻结采集项
/// </summary>
protected List<string> MonthFreezeCodes = new List<string>() { "0D_177", "0D_178", "0D_179", "0D_180", "0D_181", "0D_182", "0D_183", "0D_184", "0D_193", "0D_195", };
/// <summary>
/// 获取采集项列表
/// </summary>
/// <returns></returns>
public virtual Task<List<GatherItemInfo>> GetGatherItemByDataTypes()
{
throw new NotImplementedException($"{nameof(GetGatherItemByDataTypes)}请根据不同系统类型进行实现");
}
/// <summary>
/// 构建待处理的下发指令任务处理
/// </summary>
/// <returns></returns>
public virtual async Task CreateToBeIssueTasks()
{
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
if (taskInfos == null || taskInfos.Length <= 0)
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时没有缓存数据,-101");
return;
}
2025-04-18 17:46:24 +08:00
var currentTime = DateTime.Now;
//定时抄读
foreach (var item in taskInfos)
{
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(item);
if (tasksToBeIssueModel == null)
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102");
continue;
}
2025-04-23 09:42:09 +08:00
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>ServerTagNametempArryay[3]=>TaskInfotempArryay[4]=>表计类别tempArryay[5]=>采集频率
var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
2025-04-15 09:43:51 +08:00
if (timeDensity > 15)
{
timeDensity = 15;
}
2025-04-25 12:01:15 +08:00
//电表定时广播校时,一天一次。
string currentTimeStr = $"{currentTime:HH:mm:00}";
if (string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))//自动校时
2025-04-25 12:01:15 +08:00
{
//_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
//return;
_ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
2025-04-30 12:36:54 +08:00
var tempTask = await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
2025-04-25 12:01:15 +08:00
}
2025-04-27 17:27:04 +08:00
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
{
_ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
2025-04-30 12:36:54 +08:00
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
2025-04-27 17:27:04 +08:00
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
2025-04-25 12:01:15 +08:00
{
_ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
2025-04-30 12:36:54 +08:00
var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
2025-04-27 17:27:04 +08:00
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
{
_ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
2025-04-30 12:36:54 +08:00
var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
2025-04-27 17:27:04 +08:00
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
{
_ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
2025-04-30 12:36:54 +08:00
var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
2025-04-27 17:27:04 +08:00
});
}
else
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 不是自动校时、采集终端信息等时间,继续处理其他");
2025-04-25 12:01:15 +08:00
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
var currentTaskTime = tasksToBeIssueModel.LastTaskTime.CalculateNextCollectionTime(timeDensity);//程序启动缓存电表的时候NextTaskTime需要格式化到下一个采集点时间。
2025-05-12 10:14:08 +08:00
if (!IsTaskTime(currentTaskTime, timeDensity))//todo 如果时间超过两个采集频率周期,就一直处理,直到追加到下一个采集频率周期。
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue;
}
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
//tasksToBeIssueModel.NextTaskTime;
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
_ = CreateMeterPublishTask<AmmeterInfo>(
2025-04-18 17:46:24 +08:00
timeDensity: timeDensity,
2025-04-22 17:58:14 +08:00
nextTaskTime: currentTaskTime,
2025-04-18 17:46:24 +08:00
meterType: MeterTypeEnum.Ammeter,
2025-04-24 17:48:20 +08:00
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
2025-04-15 09:43:51 +08:00
{
2025-04-24 17:48:20 +08:00
var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
//_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempTask));
2025-04-18 17:46:24 +08:00
});
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
2025-04-18 17:46:24 +08:00
_ = CreateMeterPublishTask<WatermeterInfo>(
timeDensity: timeDensity,
2025-04-22 17:58:14 +08:00
nextTaskTime: currentTaskTime,
2025-04-23 11:13:59 +08:00
meterType: MeterTypeEnum.WaterMeter,
2025-04-24 17:48:20 +08:00
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
2025-04-18 17:46:24 +08:00
{
2025-04-24 17:48:20 +08:00
var tempTask = await WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
2025-04-23 09:42:09 +08:00
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
2025-04-18 17:46:24 +08:00
});
}
else
{
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-106");
}
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
2025-04-13 21:26:27 +08:00
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
2025-04-22 17:58:14 +08:00
tasksToBeIssueModel.LastTaskTime = currentTaskTime;
tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
2025-04-27 17:27:04 +08:00
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
//电表定时阀控任务处理。
var autoValveControlTask = await AmmeterScheduledAutoValveControl();
2025-04-27 17:27:04 +08:00
2025-04-30 12:36:54 +08:00
if (autoValveControlTask == null || autoValveControlTask.Count <= 0)
{
_logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
}
2025-03-14 14:24:38 +08:00
#region
/// <summary>
/// 获取电表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 初始化电表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
//return;
// 创建取消令牌源
//var cts = new CancellationTokenSource();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader );
2025-04-30 12:36:54 +08:00
2025-04-23 09:42:09 +08:00
//此处代码不要删除
2025-04-28 16:37:31 +08:00
#if DEBUG
2025-05-08 08:43:37 +08:00
var timeDensity = "15";
var serverTagName = "JiSheCollectBus2";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}";
2025-04-28 16:37:31 +08:00
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
2025-05-08 08:43:37 +08:00
List<string> focusAddressDataLista = new List<string>();
var timer1 = Stopwatch.StartNew();
var allIds = new HashSet<string>();
decimal? score = null;
string member = null;
while (true)
{
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: score,
lastMember: member);
meterInfos.AddRange(page.Items);
focusAddressDataLista.AddRange(page.Items.Select(d => $"{d.MeterId}"));
foreach (var item in page.Items)
{
if (!allIds.Add(item.MemberId))
{
_logger.LogError($"{item.MemberId}Duplicate data found!");
}
}
if (!page.HasNext) break;
score = page.NextScore;
member = page.NextMember;
}
timer1.Stop();
_logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
//return;
2025-04-28 16:37:31 +08:00
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
2025-03-14 14:24:38 +08:00
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes();
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
}
2025-04-15 17:40:17 +08:00
var timer = Stopwatch.StartNew();
2025-03-14 14:24:38 +08:00
2025-04-23 11:13:59 +08:00
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
2025-03-14 14:24:38 +08:00
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
var currentTaskTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = currentTaskTime;
}
2025-04-18 17:46:24 +08:00
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
2025-04-18 17:46:24 +08:00
TimeDensity = item.Key,
};
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);
2025-04-18 17:46:24 +08:00
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
Dictionary<string, List<AmmeterInfo>> keyValuePairs = new Dictionary<string, List<AmmeterInfo>>();
2025-04-18 17:46:24 +08:00
2025-03-14 14:24:38 +08:00
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
2025-04-16 17:36:46 +08:00
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
2025-04-15 17:40:17 +08:00
List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
2025-03-14 14:24:38 +08:00
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
{
if (string.IsNullOrWhiteSpace(item.Key))//集中器号为空,跳过
2025-03-14 14:24:38 +08:00
{
continue;
}
2025-04-23 16:17:29 +08:00
2025-03-14 14:24:38 +08:00
foreach (var ammeter in item)
{
2025-04-23 11:13:59 +08:00
deviceIds.Add(ammeter.MeterId.ToString());
2025-03-14 14:24:38 +08:00
//处理ItemCode
2025-03-27 08:38:19 +08:00
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
2025-03-14 14:24:38 +08:00
{
var itemArr = ammeter.DataTypes.Split(',').ToList();
#region
List<string> itemCodeList = new List<string>();
foreach (var dataType in itemArr)
{
2025-03-17 11:34:30 +08:00
var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表
2025-03-14 14:24:38 +08:00
var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
if (gatherItem != null)
{
if (!excludeItemCode.Contains(gatherItem.ItemCode))
{
itemCodeList.Add(gatherItem.ItemCode);
}
}
2025-03-17 11:34:30 +08:00
#region
if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS
{
itemCodeList.Add("10_95");
}
if (itemArr.Exists(e => e.Equals("109")))//WAVE_109
{
itemCodeList.Add("10_109");
}
#endregion
2025-03-14 14:24:38 +08:00
}
#endregion
2025-03-17 11:34:30 +08:00
2025-03-14 14:24:38 +08:00
ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
}
}
2025-04-15 17:40:17 +08:00
ammeterInfos.Add(ammeter);
if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
{
keyValuePairs[ammeter.FocusAddress] = new List<AmmeterInfo>() {ammeter };
}
else
{
keyValuePairs[ammeter.FocusAddress].Add(ammeter);
}
2025-03-14 14:24:38 +08:00
}
}
//await _redisDataCacheService.BatchInsertDataAsync2<AmmeterInfo>(
// redisCacheMeterInfoSetIndexKey,
// redisDeviceInfoHashCacheKey,
// keyValuePairs);
2025-04-16 17:36:46 +08:00
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey,
redisDeviceInfoHashCacheKey,
ammeterInfos);
2025-03-14 14:24:38 +08:00
}
2025-04-10 14:12:14 +08:00
//初始化设备组负载控制
2025-04-23 11:13:59 +08:00
if (deviceIds == null || deviceIds.Count <= 0)
2025-04-10 14:12:14 +08:00
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
2025-04-23 11:13:59 +08:00
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
2025-04-10 14:12:14 +08:00
}
2025-04-15 17:40:17 +08:00
timer.Stop();
2025-05-12 10:14:08 +08:00
_logger.LogWarning($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 1分钟采集电表数据只获取任务数据下发不构建任务
2025-03-14 14:24:38 +08:00
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{
2025-04-18 17:46:24 +08:00
int timeDensity = 5;
2025-04-23 09:42:09 +08:00
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
2025-03-14 14:24:38 +08:00
if (taskInfo == null)
2025-03-14 14:24:38 +08:00
{
2025-04-23 09:42:09 +08:00
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
2025-04-01 22:50:34 +08:00
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
2025-03-18 22:43:24 +08:00
2025-04-23 09:42:09 +08:00
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
2025-04-23 16:17:29 +08:00
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, new IoTDBQueryOptions()
2025-04-23 09:42:09 +08:00
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
2025-04-23 16:17:29 +08:00
PageSize = pageSize,
2025-04-23 09:42:09 +08:00
Conditions = conditions,
});
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 5分钟采集电表数据
2025-03-14 14:24:38 +08:00
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
2025-03-14 14:24:38 +08:00
{
int timeDensity = 5;
2025-04-23 09:42:09 +08:00
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
2025-03-17 11:34:30 +08:00
if (taskInfo == null)
2025-03-14 14:24:38 +08:00
{
2025-04-23 09:42:09 +08:00
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
2025-03-18 22:43:24 +08:00
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
2025-03-14 14:24:38 +08:00
2025-04-23 09:42:09 +08:00
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
2025-03-14 14:24:38 +08:00
2025-04-23 16:17:29 +08:00
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, new IoTDBQueryOptions()
2025-04-23 09:42:09 +08:00
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
2025-04-23 16:17:29 +08:00
PageSize = pageSize,
2025-04-23 09:42:09 +08:00
Conditions = conditions,
});
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 15分钟采集电表数据
2025-03-14 14:24:38 +08:00
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
2025-03-14 14:24:38 +08:00
{
int timeDensity = 15;
2025-04-23 09:42:09 +08:00
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
2025-03-20 16:40:27 +08:00
if (taskInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
2025-03-14 14:24:38 +08:00
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
2025-04-18 17:46:24 +08:00
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
2025-03-14 14:24:38 +08:00
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
2025-04-17 17:23:20 +08:00
});
2025-03-20 16:40:27 +08:00
2025-04-23 16:17:29 +08:00
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
2025-04-23 16:17:29 +08:00
PageSize = pageSize,
Conditions = conditions,
2025-04-23 09:42:09 +08:00
});
2025-03-14 14:24:38 +08:00
}
/// <summary>
2025-04-18 17:46:24 +08:00
/// 创建电表待发送的任务数据
2025-03-14 14:24:38 +08:00
/// </summary>
/// <param name="timeDensity">采集频率</param>
2025-04-18 17:46:24 +08:00
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
2025-03-14 14:24:38 +08:00
/// <returns></returns>
2025-04-24 17:48:20 +08:00
private async Task<List<MeterReadingTelemetryPacketInfo>> AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
2025-04-14 17:38:34 +08:00
{
var currentTime = DateTime.Now;
2025-04-18 11:31:23 +08:00
2025-04-24 17:48:20 +08:00
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 创建电表待发送的任务数据{currentTime}没有找到对应的协议组件,-105");
return null;
2025-04-24 17:48:20 +08:00
}
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return null;
}
//载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return null;
}
2025-04-14 17:38:34 +08:00
if (ammeterInfo.State.Equals(2))
2025-04-14 17:38:34 +08:00
{
2025-04-30 12:36:54 +08:00
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return null;
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
2025-04-14 17:38:34 +08:00
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空");
return null;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空");
return null;
}
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535");
return null;
}
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{
2025-04-30 12:36:54 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})");
return null;
}
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
if (tempCodes.Contains("0C_49"))
2025-04-14 17:38:34 +08:00
{
tempSubCodes.Add("0C_49");
2025-04-14 17:38:34 +08:00
}
if (tempSubCodes.Contains("0C_149"))
2025-04-14 17:38:34 +08:00
{
tempSubCodes.Add("0C_149");
2025-04-14 17:38:34 +08:00
}
if (ammeterInfo.ItemCodes.Contains("10_97"))
2025-04-14 17:38:34 +08:00
{
tempSubCodes.Add("10_97");
2025-04-14 17:38:34 +08:00
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
2025-04-14 17:38:34 +08:00
{
2025-04-15 15:49:51 +08:00
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
return null;
2025-04-14 17:38:34 +08:00
}
else
2025-04-14 17:38:34 +08:00
{
tempCodes = tempSubCodes;
2025-04-14 17:38:34 +08:00
}
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
2025-04-14 17:38:34 +08:00
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
2025-04-14 17:38:34 +08:00
{
continue;
}
2025-04-24 17:48:20 +08:00
//var itemCodeArr = tempItem.Split('_');
//var aFNStr = itemCodeArr[0];
//var aFN = (AFN)aFNStr.HexToDec();
//var fn = int.Parse(itemCodeArr[1]);
2025-04-14 17:38:34 +08:00
2025-04-24 17:48:20 +08:00
//TODO:特殊表
2025-04-24 23:39:39 +08:00
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
2025-04-24 17:48:20 +08:00
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = tempItem,
2025-04-30 17:11:09 +08:00
DataTimeMark = new Protocol.DataTimeMark()
{
2025-05-11 21:53:55 +08:00
Density = ammeterInfo.TimeDensity.GetFocusDensity(),//转换成协议的值
2025-04-30 17:11:09 +08:00
Point = 1,
DataTime = timestamps,
}
2025-04-24 17:48:20 +08:00
});
2025-04-18 11:31:23 +08:00
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
2025-04-15 15:49:51 +08:00
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
2025-04-14 17:38:34 +08:00
2025-04-27 17:27:04 +08:00
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: DateTimeOffset.Now.ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: tempItem,
subItemCode: null,
pendingCopyReadTime: timestamps,
2025-04-29 10:02:10 +08:00
creationTime: currentTime,
packetType: (TelemetryPacketTypeEnum)timeDensity);
taskList.Add(meterReadingRecords);
2025-04-14 17:38:34 +08:00
}
2025-04-15 09:43:51 +08:00
return taskList;
2025-04-15 15:49:51 +08:00
}
2025-04-23 16:17:29 +08:00
/// <summary>
/// 获取电表阀控配置
/// </summary>
/// <param name="currentTime">阀控的时间</param>
/// <returns></returns>
public virtual Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
{
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 电表自动阀控
/// </summary>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutoValveControl()
2025-04-23 16:17:29 +08:00
{
throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现");
}
2025-04-25 12:01:15 +08:00
/// <summary>
/// 电表自动校时
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
2025-04-25 12:01:15 +08:00
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
//判断是否是自动校时时间
2025-04-27 17:27:04 +08:00
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
2025-04-25 12:01:15 +08:00
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
2025-04-30 12:36:54 +08:00
return null;
2025-04-25 12:01:15 +08:00
}
2025-04-27 17:27:04 +08:00
2025-04-25 12:01:15 +08:00
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
var subItemCode = T6452007PacketItemCodeConst.C08;
2025-04-25 12:01:15 +08:00
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
2025-04-30 12:36:54 +08:00
return null;
2025-04-25 12:01:15 +08:00
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCode,
2025-04-25 12:01:15 +08:00
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = ammeterInfo.AmmerterAddress,
Password = ammeterInfo.Password,
ItemCode = subItemCode,
2025-04-25 12:01:15 +08:00
}
});
2025-04-27 17:27:04 +08:00
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCode,
subItemCode: subItemCode,
pendingCopyReadTime: currentTime,
2025-04-29 10:02:10 +08:00
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterAutomaticVerificationTime);
2025-04-25 12:01:15 +08:00
taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
2025-04-30 12:36:54 +08:00
return null;
2025-04-25 12:01:15 +08:00
}
2025-04-30 12:36:54 +08:00
return null;
2025-04-25 12:01:15 +08:00
//todo 阀控记录入库,推送到新的服务
}
catch (Exception)
{
throw;
}
}
2025-04-27 17:27:04 +08:00
/// <summary>
/// 日冻结抄读
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
2025-04-27 17:27:04 +08:00
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
2025-04-30 12:36:54 +08:00
return null;
2025-04-27 17:27:04 +08:00
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
2025-04-30 12:36:54 +08:00
return null;
2025-04-27 17:27:04 +08:00
}
2025-04-29 10:02:10 +08:00
2025-04-27 17:27:04 +08:00
foreach (var item in DayFreezeCodes)
{
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = item
});
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: item,
subItemCode: null,
pendingCopyReadTime: currentTime,
2025-04-29 10:02:10 +08:00
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterDayFreeze);
2025-04-27 17:27:04 +08:00
taskList.Add(meterReadingRecords);
}
2025-04-29 10:02:10 +08:00
2025-04-27 17:27:04 +08:00
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106");
2025-04-30 12:36:54 +08:00
return null;
2025-04-27 17:27:04 +08:00
}
2025-04-30 12:36:54 +08:00
return taskList;
2025-04-29 10:02:10 +08:00
2025-04-27 17:27:04 +08:00
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 月冻结数据抄读
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
2025-04-27 17:27:04 +08:00
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
2025-04-30 12:36:54 +08:00
return null;
2025-04-27 17:27:04 +08:00
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
2025-04-30 12:36:54 +08:00
return null;
2025-04-27 17:27:04 +08:00
}
foreach (var item in DayFreezeCodes)
{
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = item
});
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: item,
subItemCode: null,
pendingCopyReadTime: currentTime,
2025-04-29 10:02:10 +08:00
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterMonthFreeze);
2025-04-27 17:27:04 +08:00
taskList.Add(meterReadingRecords);
}
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106");
2025-04-30 12:36:54 +08:00
return null;
2025-04-27 17:27:04 +08:00
}
2025-04-30 12:36:54 +08:00
return taskList;
2025-04-27 17:27:04 +08:00
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 电表监控重试抄读任务
/// </summary>
/// <param name="retryReadingEnum"></param>
/// <returns></returns>
public virtual async Task AmmeterScheduledRetryReading(RetryReadingEnum retryReadingEnum)
{
// 根据任务获取不同的锁
var tryLock = FreeRedisProvider.Instance.Lock(retryReadingEnum.ToString(), 10);
try
{
if (tryLock != null)
{
// 轮询IotDB未成果下发电表数据
var conditions = new List<QueryCondition>();
// 下次发布时间小于等于当前时间
conditions.Add(new QueryCondition()
{
Field = "NextSendTime",
Operator = "<",
IsNumber = false,
Value = DateTime.Now
});
// 重试次数少于3次
conditions.Add(new QueryCondition()
{
Field = "SendNum",
Operator = "<",
IsNumber = true,
Value = 3
});
//已发布的
conditions.Add(new QueryCondition()
{
Field = "IsSend",
Operator = "=",
IsNumber = false,
Value = true
});
// 未响应
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
2025-04-30 12:36:54 +08:00
//await CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerRetryEventName, new IoTDBQueryOptions()
//{
// TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
// PageIndex = 1,
// PageSize = pageSize,
// Conditions = conditions,
//});
// 释放锁
tryLock.Unlock();
}
}
catch (Exception)
{
// 释放锁
tryLock.Unlock();
throw;
}
}
#endregion
#region
/// <summary>
/// 获取水表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 初始化水表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitWatermeterCacheData(string gatherCode = "")
{
var meterInfos = await GetWatermeterInfoList(gatherCode);
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes();
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
}
2025-03-14 14:24:38 +08:00
2025-04-23 11:13:59 +08:00
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
2025-04-23 09:42:09 +08:00
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
var currentTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
2025-04-23 09:42:09 +08:00
{
_applicationOptions.FirstCollectionTime = currentTime;
2025-04-23 09:42:09 +08:00
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
2025-04-30 12:36:54 +08:00
TimeDensity = item.Key,
2025-04-23 09:42:09 +08:00
};
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);//使用首次采集时间作为下一次采集时间
2025-04-23 09:42:09 +08:00
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
2025-04-23 09:42:09 +08:00
List<WatermeterInfo> watermeterInfo = new List<WatermeterInfo>();
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
{
if (string.IsNullOrWhiteSpace(item.Key))
{
continue;
2025-03-14 17:28:58 +08:00
}
2025-03-14 14:24:38 +08:00
2025-04-23 09:42:09 +08:00
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
foreach (var subItem in item)
{
2025-04-23 11:13:59 +08:00
deviceIds.Add(subItem.MeterId.ToString());
2025-04-23 09:42:09 +08:00
watermeterInfo.Add(subItem);
}
2025-04-23 09:42:09 +08:00
await _redisDataCacheService.BatchInsertDataAsync<WatermeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisDeviceInfoHashCacheKey,
2025-04-23 09:42:09 +08:00
redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo);
2025-03-14 14:24:38 +08:00
}
2025-04-23 09:42:09 +08:00
}
2025-04-23 09:42:09 +08:00
//初始化设备组负载控制
2025-04-23 11:13:59 +08:00
if (deviceIds == null || deviceIds.Count <= 0)
2025-04-23 09:42:09 +08:00
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
2025-04-23 09:42:09 +08:00
}
else
{
2025-04-23 11:13:59 +08:00
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
2025-03-17 11:34:30 +08:00
}
2025-04-18 17:46:24 +08:00
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 水表数据采集
/// </summary>
/// <returns></returns>
2025-04-29 10:02:10 +08:00
public virtual async Task WatermeterScheduledMeterAutoReadding()
{
//获取缓存中的水表信息
2025-04-23 09:42:09 +08:00
int timeDensity = 60;//水表目前只有一个采集频率 60分钟
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null)
{
2025-04-23 09:42:09 +08:00
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
2025-04-23 09:42:09 +08:00
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
2025-04-23 09:42:09 +08:00
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
2025-04-30 12:36:54 +08:00
//_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions()
//{
// TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
// PageIndex = 1,
// PageSize = pageSize,
// Conditions = conditions,
//});
2025-04-01 22:50:34 +08:00
2025-04-29 10:02:10 +08:00
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReadding)} {timeDensity}分钟采集水表数据处理完成");
2025-04-23 09:42:09 +08:00
}
2025-04-01 22:50:34 +08:00
2025-04-23 09:42:09 +08:00
/// <summary>
/// 创建水表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="watermeter">水表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">时间格式的任务批次名称</param>
/// <returns></returns>
2025-04-24 17:48:20 +08:00
private async Task<List<MeterReadingTelemetryPacketInfo>> WatermeterCreatePublishTaskAction(int timeDensity
2025-04-23 09:42:09 +08:00
, WatermeterInfo watermeter, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
2025-03-18 22:43:24 +08:00
2025-04-23 09:42:09 +08:00
string typeName;
if (watermeter.MeterType == MeterTypeEnum.WaterMeter)
{
timeDensity = watermeter.TimeDensity;//水表默认为60分钟
typeName = watermeter.LinkType;
if (watermeter.MeterBrand.Contains("泉高阀门") || watermeter.MeterBrand.Equals("LXSY-山水翔"))
{
typeName = watermeter.MeterBrand;
}
}
2025-04-23 09:42:09 +08:00
else if (watermeter.MeterType == MeterTypeEnum.WaterMeterFlowmeter)
{
2025-04-23 09:42:09 +08:00
typeName = watermeter.MeterBrand;
}
else
{
_logger.LogError($"{nameof(WatermeterCreatePublishTaskAction)} 水表类型错误:{watermeter.Serialize()}");
return null;
}
2025-04-23 09:42:09 +08:00
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
2025-04-27 17:27:04 +08:00
//根据表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code);
if (protocolPlugin == null)
2025-04-23 09:42:09 +08:00
{
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
//return;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
string subItemCode = T1882018PacketItemCodeConst.CTR0190;
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
2025-04-23 09:42:09 +08:00
{
FocusAddress = watermeter.FocusAddress,
Pn = watermeter.MeteringCode,
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = watermeter.MeterAddress,
Password = watermeter.Password,
ItemCode = subItemCode,
}
});
if (builderResponse == null || builderResponse.Data.Length <= 0)
2025-04-23 09:42:09 +08:00
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
return null;
2025-04-23 09:42:09 +08:00
}
if (builderResponse == null || builderResponse.Data.Length <= 0)
2025-04-23 09:42:09 +08:00
{
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{itemCode}未能正确获取报文。");
return null;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
2025-04-23 16:17:29 +08:00
{
SystemName = SystemType,
ProjectId = $"{watermeter.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{watermeter.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = watermeter.DatabaseBusiID,
2025-04-29 10:02:10 +08:00
PacketType = (int)TelemetryPacketTypeEnum.WatermeterAutoReadding,
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = watermeter.MeterAddress,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = itemCode,
SubItemCode = subItemCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = watermeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{watermeter.FocusAddress}.{taskMark}".Md5Fun(),
};
taskList.Add(meterReadingRecords);
2025-04-18 17:46:24 +08:00
return taskList;
}
#endregion
#region
/// <summary>
/// 自动获取终端版
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
2025-04-23 09:42:09 +08:00
{
//判断是否是自动获取版本号时间
2025-04-27 17:27:04 +08:00
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
2025-04-23 09:42:09 +08:00
{
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTerminalVersion)} 集中器自动获取版本号,非自动处理时间");
2025-04-30 12:36:54 +08:00
return null;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var itemCode = T37612012PacketItemCodeConst.AFN09HFN01H;
//var subItemCode = T6452007PacketItemCodeConst.C08;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
2025-04-23 09:42:09 +08:00
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器自动获取版本号{currentTime}没有找到对应的协议组件,-105");
2025-04-30 12:36:54 +08:00
return null;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
2025-04-24 23:39:39 +08:00
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
2025-04-23 09:42:09 +08:00
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCode,
//SubProtocolRequest = new SubProtocolBuildRequest()
//{
// MeterAddress = ammeterInfo.AmmerterAddress,
// Password = ammeterInfo.Password,
// ItemCode = subItemCode,
//}
});
2025-04-27 17:27:04 +08:00
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCode,
subItemCode: null,
pendingCopyReadTime: currentTime,
2025-04-29 10:02:10 +08:00
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.TerminalVersion);
taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
2025-04-30 12:36:54 +08:00
return null;
}
2025-04-30 12:36:54 +08:00
return taskList;
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 自动获取远程通信模块(SIM)版本信息
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
2025-04-30 12:36:54 +08:00
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
//判断是否是自动获取版本号时间
2025-04-27 17:27:04 +08:00
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
2025-04-23 09:42:09 +08:00
{
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息,非自动处理时间");
2025-04-30 12:36:54 +08:00
return null;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var itemCode = T37612012PacketItemCodeConst.AFN09HFN09H;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
2025-04-23 09:42:09 +08:00
{
_logger.LogError($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息{currentTime}没有找到对应的协议组件,-105");
2025-04-30 12:36:54 +08:00
return null;
2025-04-23 09:42:09 +08:00
}
2025-04-18 17:46:24 +08:00
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCode,
});
2025-04-18 17:46:24 +08:00
2025-04-27 17:27:04 +08:00
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCode,
subItemCode: null,
pendingCopyReadTime: currentTime,
2025-04-29 10:02:10 +08:00
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.TelematicsModule);
2025-04-23 09:42:09 +08:00
taskList.Add(meterReadingRecords);
2025-04-18 17:46:24 +08:00
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
2025-04-30 12:36:54 +08:00
return null;
}
2025-04-30 12:36:54 +08:00
return taskList;
}
catch (Exception)
{
throw;
}
}
#endregion
#region
/// <summary>
/// 判断是否需要生成采集指令
/// </summary>
/// <param name="nextTaskTime"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
2025-04-23 16:17:29 +08:00
protected bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
{
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
{
return true;
}
return false;
}
2025-03-18 22:43:24 +08:00
2025-04-18 17:46:24 +08:00
/// <summary>
/// 创建表的待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="nextTaskTime">采集频率对应的任务时间戳</param>
2025-04-18 17:46:24 +08:00
/// <param name="meterType">表类型</param>
/// <param name="taskCreateAction">具体的创建任务的委托</param>
/// <returns></returns>
2025-04-23 16:17:29 +08:00
protected async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
2025-04-18 17:46:24 +08:00
{
var timer = Stopwatch.StartNew();
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
List<T> meterInfos = new List<T>();
decimal? cursor = null;
string member = null;
while (true)
2025-04-23 09:42:09 +08:00
{
var page = await _redisDataCacheService.GetAllPagedData<T>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
if (!page.HasNext)
{
break;
}
cursor = page.NextScore;
member = page.NextMember;
}
2025-04-23 09:42:09 +08:00
//var page = await _redisDataCacheService.GetAllPagedData<T>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 10,
// lastScore: cursor,
// lastMember: member);
//meterInfos.AddRange(page.Items);
2025-04-18 17:46:24 +08:00
if (meterInfos == null || meterInfos.Count <= 0)
{
timer.Stop();
2025-04-23 09:42:09 +08:00
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
2025-04-18 17:46:24 +08:00
return;
}
timer.Stop();
2025-04-30 12:36:54 +08:00
_logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
2025-04-18 17:46:24 +08:00
timer.Restart();
2025-04-18 17:46:24 +08:00
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.MeterId.ToString(),
2025-04-18 17:46:24 +08:00
processor: (data, groupIndex) =>
{
taskCreateAction(timeDensity, data, groupIndex, nextTaskTime);
2025-04-18 17:46:24 +08:00
}
);
timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息");
2025-04-18 17:46:24 +08:00
}
/// <summary>
/// 创建Kafka消息
/// </summary>
2025-04-23 16:17:29 +08:00
/// <typeparam name="T"></typeparam>
/// <param name="kafkaTopicName">kafka主题名称</param>
/// <param name="options">任务查询条件</param>
2025-04-18 17:46:24 +08:00
/// <returns></returns>
2025-04-23 16:17:29 +08:00
protected async Task CreateMeterKafkaTaskMessage<T>(string kafkaTopicName, IoTDBQueryOptions options) where T : IoTEntity, new()
2025-04-18 17:46:24 +08:00
{
2025-04-23 16:17:29 +08:00
if (string.IsNullOrWhiteSpace(kafkaTopicName))
{
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空-101");
return;
}
int pageNumber = 103;
2025-04-18 17:46:24 +08:00
bool hasNext;
var stopwatch = Stopwatch.StartNew();
2025-04-24 17:48:20 +08:00
do
{
var stopwatch2 = Stopwatch.StartNew();
options.PageIndex = pageNumber++;
var pageResult = await _dbProvider.QueryAsync<T>(options);
hasNext = pageResult.HasNext;
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<T>(
items: pageResult.Items.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(kafkaTopicName, data, groupIndex);
}
);
stopwatch2.Stop();
_logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
} while (hasNext);
2025-04-18 17:46:24 +08:00
stopwatch.Stop();
_logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
2025-04-18 17:46:24 +08:00
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
2025-04-23 16:17:29 +08:00
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
T taskRecord, int partition) where T : class
2025-04-18 17:46:24 +08:00
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
}
2025-04-30 12:36:54 +08:00
// await _dataChannelManage.ProduceAsync<T>(topicName, taskRecord, partition);
2025-04-18 17:46:24 +08:00
}
2025-04-27 17:27:04 +08:00
/// <summary>
/// 构建报文保存对象
/// </summary>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="timestamps">IoTDB存储时标</param>
/// <param name="builderResponse">报文构建返回结果</param>
/// <param name="itemCode">端到云协议采集项编码</param>
/// <param name="subItemCode">端到端采集项编码</param>
/// <param name="pendingCopyReadTime">待采集时间,定时采集频率才是特殊情况,其他默认当前时间戳</param>
/// <param name="creationTime">数据创建时间戳</param>
2025-04-29 10:02:10 +08:00
/// <param name="packetType">数据包类型</param>
2025-04-27 17:27:04 +08:00
/// <returns></returns>
2025-04-29 10:02:10 +08:00
protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType)
2025-04-27 17:27:04 +08:00
{
try
2025-04-27 17:27:04 +08:00
{
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
return new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{ammeterInfo.MeterId}",
Timestamps = timestamps,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = creationTime,
MeterAddress = ammeterInfo.AmmerterAddress,
PacketType = (int)packetType,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
FocusId = ammeterInfo.FocusId,
FocusAddress = ammeterInfo.FocusAddress,
ItemCode = itemCode,
SubItemCode = subItemCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = Guid.NewGuid().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
}
catch (Exception ex)
{
throw ex;
}
}
2025-03-14 14:24:38 +08:00
#endregion
2025-03-14 14:24:38 +08:00
}
}