1757 lines
80 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Encrypt;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.DataChannels;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol.Models;
using Mapster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Volo.Abp.Guids;
namespace JiShe.CollectBus.ScheduledMeterReading
{
/// <summary>
/// 定时采集服务
/// </summary>
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IIoTDbProvider _dbProvider;
private readonly IDataChannelManageService _dataChannelManage;
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly IProtocolService _protocolService;
private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions;
private readonly IGuidGenerator _guidGenerator;
int pageSize = 10000;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
IDataChannelManageService dataChannelManage,
IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider,
IProtocolService protocolService,
IGuidGenerator guidGenerator,
IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions)
{
_logger = logger;
_dbProvider = dbProvider;
_dataChannelManage = dataChannelManage;
_redisDataCacheService = redisDataCacheService;
_protocolService = protocolService;
_kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value;
_guidGenerator = guidGenerator;
}
/// <summary>
/// 系统类型
/// </summary>
public abstract string SystemType { get; }
/// <summary>
/// 应用服务器部署标记
/// </summary>
public abstract string ServerTagName { get; }
/// <summary>
///电表日冻结采集项
/// </summary>
protected List<string> DayFreezeCodes = new List<string>() { "0D_3", "0D_4", "0D_161", "0D_162", "0D_163", "0D_164", "0D_165", "0D_166", "0D_167", "0D_168", "0C_149", };
/// <summary>
/// 电表月冻结采集项
/// </summary>
protected List<string> MonthFreezeCodes = new List<string>() { "0D_177", "0D_178", "0D_179", "0D_180", "0D_181", "0D_182", "0D_183", "0D_184", "0D_193", "0D_195", };
/// <summary>
/// 获取采集项列表
/// </summary>
/// <returns></returns>
public virtual Task<List<GatherItemInfo>> GetGatherItemByDataTypes()
{
throw new NotImplementedException($"{nameof(GetGatherItemByDataTypes)}请根据不同系统类型进行实现");
}
/// <summary>
/// 构建待处理的下发指令任务处理
/// </summary>
/// <returns></returns>
public virtual async Task CreateToBeIssueTasks()
{
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
if (taskInfos == null || taskInfos.Length <= 0)
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时没有缓存数据,-101");
return;
}
var currentTime = DateTime.Now;
//定时抄读
foreach (var item in taskInfos)
{
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(item);
if (tasksToBeIssueModel == null)
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102");
continue;
}
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>ServerTagNametempArryay[3]=>TaskInfotempArryay[4]=>表计类别tempArryay[5]=>采集频率
var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
if (timeDensity > 15)
{
timeDensity = 15;
}
//电表定时广播校时,一天一次。
string currentTimeStr = $"{currentTime:HH:mm:00}";
if (string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))//自动校时
{
//_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
//return;
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表自动校时 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器终端版本信息 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await ConcentratorScheduledAutomaticGetTelematicsModule(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"集中器SIM卡读取 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticMonthFreezeTime, StringComparison.CurrentCultureIgnoreCase))//月冻结
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmeterScheduledGetAutomaticMonthFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表月冻结 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmeterScheduledGetAutomaticDayFreezeData(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表日冻结 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, tempTask));
});
}
else
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 不是自动校时、采集终端信息等时间,继续处理其他");
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
var currentTaskTime = tasksToBeIssueModel.LastTaskTime.CalculateNextCollectionTime(timeDensity);//程序启动缓存电表的时候NextTaskTime需要格式化到下一个采集点时间。
if (!IsTaskTime(currentTaskTime, timeDensity))//todo 如果时间超过两个采集频率周期,就一直处理,直到追加到下一个采集频率周期。
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue;
}
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempTask));
});
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
_ = CreateMeterPublishTask<DeviceInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.WaterMeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempTask));
});
}
else
{
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-106");
}
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
tasksToBeIssueModel.LastTaskTime = currentTaskTime;
tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
////电表定时阀控任务处理。
//var autoValveControlTask = await AmmeterScheduledAutoValveControl();
//if (autoValveControlTask == null || autoValveControlTask.Count <= 0)
//{
// _logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务");
// return;
//}
//_ = _dataChannelManage.ScheduledMeterTaskWriterAsync(DataChannelManage.TaskDataChannel.Writer, (ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask));
}
#region
/// <summary>
/// 获取电表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task<List<DeviceInfo>> GetAmmeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 初始化电表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
//return;
try
{
// 创建取消令牌源
//var cts = new CancellationTokenSource();
await _dbProvider.GetSessionPool(true).InitTableSessionModelAsync();
_ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader);
// //此处代码不要删除
//#if DEBUG
// var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus2:DeviceInfo";
// var timer1 = Stopwatch.StartNew();
// Dictionary<string, List<DeviceInfo>> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll<List<DeviceInfo>>(redisCacheDeviceInfoHashKeyTemp);
// List<DeviceInfo> meterInfos = new List<DeviceInfo>();
// List<string> focusAddressDataLista = new List<string>();
// foreach (var item in keyValuePairsTemps)
// {
// foreach (var subItem in item.Value)
// {
// if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15)
// {
// meterInfos.Add(subItem);
// focusAddressDataLista.Add(subItem.MeterId.ToString());
// }
// }
// }
// timer1.Stop();
// _logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
// DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
// return;
//#else
// var meterInfos = await GetAmmeterInfoList(gatherCode);
//#endif
var meterInfos = await GetAmmeterInfoList(gatherCode);
if (meterInfos == null || meterInfos.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
return;
}
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,读取数据成功");
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes();
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
return;
}
var timer = Stopwatch.StartNew();
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.Select(d => d.TimeDensity).GroupBy(d => d);
var currentTaskTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = currentTaskTime;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
TimeDensity = item.Key,
};
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
//设备hash缓存key
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
//设备分组集合key
string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}";
Dictionary<string, List<DeviceInfo>> keyValuePairs = new Dictionary<string, List<DeviceInfo>>();
//处理设备缓存信息
foreach (var ammeter in meterInfos)
{
deviceIds.Add(ammeter.MeterId.ToString());
//处理ItemCode
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
{
var itemArr = ammeter.DataTypes.Split(',').ToList();
#region
List<string> itemCodeList = new List<string>();
foreach (var dataType in itemArr)
{
var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表
var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
if (gatherItem != null)
{
if (!excludeItemCode.Contains(gatherItem.ItemCode))
{
itemCodeList.Add(gatherItem.ItemCode);
}
}
#region
if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS
{
itemCodeList.Add("10_95");
}
if (itemArr.Exists(e => e.Equals("109")))//WAVE_109
{
itemCodeList.Add("10_109");
}
#endregion
}
#endregion
ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
}
}
var tempItemCodeList = new List<string>() { "10_97" };
ammeter.ItemCodes = tempItemCodeList.Serialize();
if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
{
keyValuePairs[ammeter.FocusAddress] = new List<DeviceInfo>() { ammeter.Adapt<DeviceInfo>() };
}
else
{
keyValuePairs[ammeter.FocusAddress].Add(ammeter.Adapt<DeviceInfo>());
}
}
await _redisDataCacheService.BatchInsertDataAsync<DeviceInfo>(
redisCacheDeviceGroupSetIndexKey,
redisCacheDeviceInfoHashKey,
keyValuePairs);
//初始化设备组负载控制
if (deviceIds == null || deviceIds.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
}
timer.Stop();
_logger.LogWarning($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据异常");
throw ex;
}
}
/// <summary>
/// 1分钟采集电表数据只获取任务数据下发不构建任务
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{
int timeDensity = 5;
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = pageSize,
Conditions = conditions,
});
}
/// <summary>
/// 5分钟采集电表数据
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
{
int timeDensity = 5;
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = pageSize,
Conditions = conditions,
});
}
/// <summary>
/// 15分钟采集电表数据
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
{
int timeDensity = 15;
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = pageSize,
Conditions = conditions,
});
}
/// <summary>
/// 创建电表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
private async Task<List<MeterReadingTelemetryPacketInfo>> AmmerterCreatePublishTaskAction(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 创建电表待发送的任务数据{currentTime}没有找到对应的协议组件,-105");
return null;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return null;
}
//载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return null;
}
if (ammeterInfo.State.Equals(2))
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return null;
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空");
return null;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空");
return null;
}
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535");
return null;
}
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{
//_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})");
return null;
}
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
if (tempCodes.Contains("0C_49"))
{
tempSubCodes.Add("0C_49");
}
if (tempSubCodes.Contains("0C_149"))
{
tempSubCodes.Add("0C_149");
}
if (ammeterInfo.ItemCodes.Contains("10_97"))
{
tempSubCodes.Add("10_97");
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
{
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
return null;
}
else
{
tempCodes = tempSubCodes;
}
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
{
continue;
}
//var itemCodeArr = tempItem.Split('_');
//var aFNStr = itemCodeArr[0];
//var aFN = (AFN)aFNStr.HexToDec();
//var fn = int.Parse(itemCodeArr[1]);
//特殊编码映射
var itemCodeInfo = T37612012PacketItemCodeConst.MappingItemCodeTo645SubCodeRelationship(tempItem);
//TODO:特殊表
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = itemCodeInfo.Item1 == T37612012PacketItemCodeConst.AFN10HFN01H ? 0 : ammeterInfo.MeteringCode,
ItemCode = itemCodeInfo.Item1,
DataTimeMark = new Protocol.DataTimeMark()
{
Density = ammeterInfo.TimeDensity.GetFocusDensity(),//转换成协议的值
Point = 1,
DataTime = timestamps,
},
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password,
MeteringPort = ammeterInfo.MeteringPort,
Baudrate = ammeterInfo.Baudrate,
ItemCode = itemCodeInfo.Item2, //10_97 => 11_02_80_00_02
}
});
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: DateTimeOffset.Now.ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCodeInfo.Item1,
subItemCode: itemCodeInfo.Item2,
pendingCopyReadTime: timestamps,
creationTime: currentTime,
packetType: (TelemetryPacketTypeEnum)timeDensity,
_guidGenerator);
taskList.Add(meterReadingRecords);
}
return taskList;
}
/// <summary>
/// 获取电表阀控配置
/// </summary>
/// <param name="currentTime">阀控的时间</param>
/// <returns></returns>
public virtual Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
{
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 电表自动阀控
/// </summary>
/// <returns></returns>
public virtual Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutoValveControl()
{
throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现");
}
/// <summary>
/// 电表自动校时
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledAutomaticVerificationTime(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
#if DEBUG
#else
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
var subItemCode = T6452007PacketItemCodeConst.C08;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = ammeterInfo.MeterAddress,
Password = ammeterInfo.Password,
ItemCode = subItemCode,
}
});
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCode,
subItemCode: subItemCode,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterAutomaticVerificationTime,
_guidGenerator);
taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return null;
}
return null;
//todo 阀控记录入库,推送到新的服务
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 日冻结抄读
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
#if DEBUG
#else
//判断是否是日冻结抄读时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledGetAutomaticDayFreezeData)} 非电表日冻结抄读时间,暂不处理");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表日冻结抄读运行时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
foreach (var item in DayFreezeCodes)
{
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = item,
DataTimeMark = new Protocol.DataTimeMark()
{
Density = ammeterInfo.TimeDensity.GetFocusDensity(),//转换成协议的值
Point = 1,
DataTime = currentTime.AddDays(-1),//日冻结抄读时间为昨天
},
});
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: item,
subItemCode: null,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterDayFreeze,
_guidGenerator);
taskList.Add(meterReadingRecords);
}
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106");
return null;
}
return taskList;
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 月冻结数据抄读
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
#if DEBUG
#else
//需要检查是不是每月1号抄读上个月的数据
if (currentTime.Date != currentTime.FirstDayOfMonth().Date)
{
_logger.LogInformation($"{nameof(AmmeterScheduledGetAutomaticMonthFreezeData)} 非月冻结数据抄读时间,暂不处理");
return null;
}
else
{
timestamps = currentTime.LastDayOfPrdviousMonth();
}
//判断是否是月冻结数据抄读
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticMonthFreezeTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 非电表月冻结抄读时间,暂不处理");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表月冻结抄读时间{currentTime}没有找到对应的协议组件,-105");
return null;
}
foreach (var item in MonthFreezeCodes)
{
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = item,
DataTimeMark = new Protocol.DataTimeMark()
{
Density = ammeterInfo.TimeDensity.GetFocusDensity(),//转换成协议的值
Point = 1,
#if DEBUG
DataTime = currentTime.AddMonths(-1),//月冻结抄读时间为上个月
#else
DataTime = timestamps,//月冻结抄读时间为上个月
#endif
},
});
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: item,
subItemCode: null,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.AmmeterMonthFreeze,
_guidGenerator);
taskList.Add(meterReadingRecords);
}
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 日冻结抄读时间{currentTime}没有任务生成,-106");
return null;
}
return taskList;
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 电表监控重试抄读任务
/// </summary>
/// <param name="retryReadingEnum"></param>
/// <returns></returns>
public virtual async Task AmmeterScheduledRetryReading(RetryReadingEnum retryReadingEnum)
{
// 根据任务获取不同的锁
var tryLock = FreeRedisProvider.Instance.Lock(retryReadingEnum.ToString(), 10);
try
{
if (tryLock != null)
{
// 轮询IotDB未成果下发电表数据
var conditions = new List<QueryCondition>();
// 下次发布时间小于等于当前时间
conditions.Add(new QueryCondition()
{
Field = "NextSendTime",
Operator = "<",
IsNumber = false,
Value = DateTime.Now
});
// 重试次数少于3次
conditions.Add(new QueryCondition()
{
Field = "SendNum",
Operator = "<",
IsNumber = true,
Value = 3
});
//已发布的
conditions.Add(new QueryCondition()
{
Field = "IsSend",
Operator = "=",
IsNumber = false,
Value = true
});
// 未响应
conditions.Add(new QueryCondition()
{
Field = "IsReceived",
Operator = "=",
IsNumber = false,
Value = false
});
//await CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerRetryEventName, new IoTDBQueryOptions()
//{
// TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
// PageIndex = 1,
// PageSize = pageSize,
// Conditions = conditions,
//});
// 释放锁
tryLock.Unlock();
}
}
catch (Exception)
{
// 释放锁
tryLock.Unlock();
throw;
}
}
#endregion
#region
/// <summary>
/// 获取水表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task<List<DeviceInfo>> GetWatermeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 初始化水表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitWatermeterCacheData(string gatherCode = "")
{
var meterInfos = await GetWatermeterInfoList(gatherCode);
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes();
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
}
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.Select(d => d.TimeDensity).GroupBy(d => d);
var currentTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = currentTime;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
TimeDensity = item.Key,
};
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);//使用首次采集时间作为下一次采集时间
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
//设备hash缓存key
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
//设备分组集合key
string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}";
Dictionary<string, List<DeviceInfo>> keyValuePairs = new Dictionary<string, List<DeviceInfo>>();
foreach (var subItem in meterInfos)
{
deviceIds.Add(subItem.MeterId.ToString());
if (!keyValuePairs.ContainsKey(subItem.FocusAddress))
{
keyValuePairs[subItem.FocusAddress] = new List<DeviceInfo>() { subItem };
}
else
{
keyValuePairs[subItem.FocusAddress].Add(subItem);
}
}
await _redisDataCacheService.BatchInsertDataAsync<DeviceInfo>(
redisCacheDeviceGroupSetIndexKey,
redisCacheDeviceInfoHashKey,
keyValuePairs);
//初始化设备组负载控制
if (deviceIds == null || deviceIds.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
}
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
}
/// <summary>
/// 水表数据采集
/// </summary>
/// <returns></returns>
public virtual async Task WatermeterScheduledMeterAutoReadding()
{
//获取缓存中的水表信息
int timeDensity = 60;//水表目前只有一个采集频率 60分钟
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
//_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions()
//{
// TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
// PageIndex = 1,
// PageSize = pageSize,
// Conditions = conditions,
//});
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReadding)} {timeDensity}分钟采集水表数据处理完成");
}
/// <summary>
/// 创建水表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="watermeter">水表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">时间格式的任务批次名称</param>
/// <returns></returns>
private async Task<List<MeterReadingTelemetryPacketInfo>> WatermeterCreatePublishTaskAction(int timeDensity
, DeviceInfo watermeter, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string typeName;
if (watermeter.MeterType == MeterTypeEnum.WaterMeter)
{
timeDensity = watermeter.TimeDensity;//水表默认为60分钟
typeName = watermeter.LinkType;
if (watermeter.BrandType.Contains("泉高阀门") || watermeter.BrandType.Equals("LXSY-山水翔"))
{
typeName = watermeter.BrandType;
}
}
else if (watermeter.MeterType == MeterTypeEnum.WaterMeterFlowmeter)
{
typeName = watermeter.BrandType;
}
else
{
_logger.LogError($"{nameof(WatermeterCreatePublishTaskAction)} 水表类型错误:{watermeter.Serialize()}");
return null;
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
//根据表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.BrandType);
if (protocolPlugin == null)
{
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
//return;
}
string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H;
string subItemCode = T1882018PacketItemCodeConst.CTR0190;
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = watermeter.FocusAddress,
Pn = watermeter.MeteringCode,
ItemCode = itemCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = watermeter.MeterAddress,
Password = watermeter.Password,
ItemCode = subItemCode,
}
});
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
return null;
}
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{itemCode}未能正确获取报文。");
return null;
}
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{watermeter.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{watermeter.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = watermeter.DatabaseBusiID,
PacketType = (int)TelemetryPacketTypeEnum.WatermeterAutoReadding,
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = watermeter.MeterAddress,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = itemCode,
SubItemCode = subItemCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = watermeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{watermeter.FocusAddress}.{taskMark}".Md5Fun(),
};
taskList.Add(meterReadingRecords);
return taskList;
}
#endregion
#region
/// <summary>
/// 自动获取终端版
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
#if DEBUG
#else
//判断是否是自动获取版本号时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTerminalVersion)} 集中器自动获取版本号,非自动处理时间");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var itemCode = T37612012PacketItemCodeConst.AFN09HFN01H;
//var subItemCode = T6452007PacketItemCodeConst.C08;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 集中器自动获取版本号{currentTime}没有找到对应的协议组件,-105");
return null;
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCode,
//SubProtocolRequest = new SubProtocolBuildRequest()
//{
// MeterAddress = ammeterInfo.AmmerterAddress,
// Password = ammeterInfo.Password,
// ItemCode = subItemCode,
//}
});
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCode,
subItemCode: null,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.TerminalVersion,
_guidGenerator);
taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return null;
}
return taskList;
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 自动获取远程通信模块(SIM)版本信息
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
public virtual async Task<List<MeterReadingTelemetryPacketInfo>> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
#if DEBUG
#else
//判断是否是自动获取版本号时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息,非自动处理时间");
return null;
}
#endif
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var itemCode = T37612012PacketItemCodeConst.AFN09HFN09H;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(ConcentratorScheduledAutomaticGetTelematicsModule)} 自动获取远程通信模块(SIM)版本信息{currentTime}没有找到对应的协议组件,-105");
return null;
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = itemCode,
});
var meterReadingRecords = CreateAmmeterPacketInfo(
ammeterInfo: ammeterInfo,
timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
builderResponse: builderResponse,
itemCode: itemCode,
subItemCode: null,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: TelemetryPacketTypeEnum.TelematicsModule,
_guidGenerator);
taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return null;
}
return taskList;
}
catch (Exception)
{
throw;
}
}
#endregion
#region
/// <summary>
/// 判断是否需要生成采集指令
/// </summary>
/// <param name="nextTaskTime"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
protected bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
{
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
{
return true;
}
return false;
}
/// <summary>
/// 创建表的待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="nextTaskTime">采集频率对应的任务时间戳</param>
/// <param name="meterType">表类型</param>
/// <param name="taskCreateAction">具体的创建任务的委托</param>
/// <returns></returns>
protected async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
{
var timer = Stopwatch.StartNew();
//获取对应频率中的所有电表信息
//设备hash缓存key
string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}";
//设备分组集合key
string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}";
//List<T> meterInfos = new List<T>();
//decimal? cursor = null;
//string member = null;
//while (true)
//{
// var page = await _redisDataCacheService.GetAllPagedData2<T>(
// redisCacheDeviceGroupSetIndexKey,
// redisCacheDeviceInfoHashKey,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// meterInfos.AddRange(page.Items);
// if (!page.HasNext)
// {
// break;
// }
// cursor = page.NextScore;
// member = page.NextMember;
//}
//var page = await _redisDataCacheService.GetAllPagedData<T>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 10,
// lastScore: cursor,
// lastMember: member);
//meterInfos.AddRange(page.Items);
//if (meterInfos == null || meterInfos.Count <= 0)
//{
// timer.Stop();
// _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
// return;
//}
Dictionary<string, List<T>> keyValuePairs = FreeRedisProvider.Instance.HGetAll<List<T>>(redisCacheDeviceInfoHashKey);
timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
List<T> meterInfos = new List<T>();
foreach (var item in keyValuePairs)
{
foreach (var subItem in item.Value)
{
if (subItem.MeterType == meterType && subItem.TimeDensity == timeDensity)
{
meterInfos.Add(subItem);
}
}
}
timer.Restart();
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.MeterId.ToString(),
processor: (data, groupIndex) =>
{
taskCreateAction(timeDensity, data, groupIndex, nextTaskTime);
}
);
timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息");
}
/// <summary>
/// 创建Kafka消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="kafkaTopicName">kafka主题名称</param>
/// <param name="options">任务查询条件</param>
/// <returns></returns>
protected async Task CreateMeterKafkaTaskMessage<T>(string kafkaTopicName, IoTDBQueryOptions options) where T : IoTEntity, new()
{
if (string.IsNullOrWhiteSpace(kafkaTopicName))
{
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空-101");
return;
}
int pageNumber = 103;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
do
{
var stopwatch2 = Stopwatch.StartNew();
options.PageIndex = pageNumber++;
var pageResult = await _dbProvider.QueryAsync<T>(options);
hasNext = pageResult.HasNext;
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<T>(
items: pageResult.Items.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(kafkaTopicName, data, groupIndex);
}
);
stopwatch2.Stop();
_logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
} while (hasNext);
stopwatch.Stop();
_logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
T taskRecord, int partition) where T : class
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
}
// await _dataChannelManage.ProduceAsync<T>(topicName, taskRecord, partition);
}
/// <summary>
/// 构建报文保存对象
/// </summary>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="timestamps">IoTDB存储时标</param>
/// <param name="builderResponse">报文构建返回结果</param>
/// <param name="itemCode">端到云协议采集项编码</param>
/// <param name="subItemCode">端到端采集项编码</param>
/// <param name="pendingCopyReadTime">待采集时间,定时采集频率才是特殊情况,其他默认当前时间戳</param>
/// <param name="creationTime">数据创建时间戳</param>
/// <param name="packetType">数据包类型</param>
/// <param name="guidGenerator">Guid生成器</param>
/// <returns></returns>
protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(DeviceInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType, IGuidGenerator guidGenerator)
{
try
{
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
return new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{ammeterInfo.MeterId}",
Timestamps = timestamps,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = creationTime,
MeterAddress = ammeterInfo.MeterAddress,
PacketType = (int)packetType,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
FocusId = ammeterInfo.FocusId,
FocusAddress = ammeterInfo.FocusAddress,
ItemCode = itemCode,
SubItemCode = subItemCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = guidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
FocusDensity = ammeterInfo.TimeDensity.GetFocusDensity(),
IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
}
catch (Exception ex)
{
throw ex;
}
}
#endregion
}
}