2025-04-25 12:01:15 +08:00

1293 lines
59 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Encrypt;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDB.Context;
using JiShe.CollectBus.IoTDB.Interface;
using JiShe.CollectBus.IoTDB.Model;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.IoTDB.Provider;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Identity.Client;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using static System.Runtime.CompilerServices.RuntimeHelpers;
namespace JiShe.CollectBus.ScheduledMeterReading
{
/// <summary>
/// 定时采集服务
/// </summary>
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
private readonly IIoTDbProvider _dbProvider;
private readonly IProducerService _producerService;
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly IProtocolService _protocolService;
int pageSize = 3000;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
IProducerService producerService,
IRedisDataCacheService redisDataCacheService,
IIoTDbProvider dbProvider,
IoTDBRuntimeContext runtimeContext,
IProtocolService protocolService,
IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions)
{
_logger = logger;
_dbProvider = dbProvider;
_runtimeContext = runtimeContext;
_producerService = producerService;
_redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value;
_protocolService = protocolService;
_runtimeContext.UseTableSessionPool = true;
}
/// <summary>
/// 系统类型
/// </summary>
public abstract string SystemType { get; }
/// <summary>
/// 应用服务器部署标记
/// </summary>
public abstract string ServerTagName { get; }
/// <summary>
///电表日冻结采集项
/// </summary>
protected List<string> DayFreezeCodes = new List<string>() { "0D_3", "0D_4", "0D_161", "0D_162", "0D_163", "0D_164", "0D_165", "0D_166", "0D_167", "0D_168", "0C_149", };
/// <summary>
/// 电表月冻结采集项
/// </summary>
protected List<string> MonthFreezeCodes = new List<string>() { "0D_177", "0D_178", "0D_179", "0D_180", "0D_181", "0D_182", "0D_183", "0D_184", "0D_193", "0D_195", };
/// <summary>
/// 获取采集项列表
/// </summary>
/// <returns></returns>
public virtual Task<List<GatherItemInfo>> GetGatherItemByDataTypes()
{
throw new NotImplementedException($"{nameof(GetGatherItemByDataTypes)}请根据不同系统类型进行实现");
}
/// <summary>
/// 构建待处理的下发指令任务处理
/// </summary>
/// <returns></returns>
public virtual async Task CreateToBeIssueTasks()
{
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
if (taskInfos == null || taskInfos.Length <= 0)
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时没有缓存数据,-101");
return;
}
var currentTime = DateTime.Now;
//定时抄读
foreach (var item in taskInfos)
{
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(item);
if (tasksToBeIssueModel == null)
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102");
continue;
}
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>ServerTagNametempArryay[3]=>TaskInfotempArryay[4]=>表计类别tempArryay[5]=>采集频率
var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
if (timeDensity > 15)
{
timeDensity = 15;
}
//电表定时广播校时,一天一次。
string currentTimeStr = $"{currentTime:HH:mm:00}";
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr, _applicationOptions.AutomaticVerificationTime, StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return;
}
else
{
_ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
await AmmeterScheduledAutomaticVerificationTime(timeDensity, data, groupIndex, timestamps);
});
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity))
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue;
}
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候NextTaskTime已经格式化到下一个采集点时间。
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
_ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dbProvider.BatchInsertAsync(metadata, tempTask);
});
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
_ = CreateMeterPublishTask<WatermeterInfo>(
timeDensity: timeDensity,
nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.WaterMeter,
taskCreateAction: async (timeDensity, data, groupIndex, timestamps) =>
{
var tempTask = await WatermeterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps);
if (tempTask == null || tempTask.Count <= 0)
{
_logger.LogWarning($"水表 {data.Name} 任务数据构建失败:{data.Serialize()}");
return;
}
_ = _dbProvider.BatchInsertAsync(metadata, tempTask);
});
}
else
{
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-106");
}
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
tasksToBeIssueModel.LastTaskTime = currentTaskTime;
tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
//电表定时阀控任务处理。
_= AmmeterScheduledAutoValveControl();
}
#region
/// <summary>
/// 获取电表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 初始化电表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
//此处代码不要删除
//#if DEBUG
// var timeDensity = "15";
// var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
// var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
// var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
// List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
// List<string> focusAddressDataLista = new List<string>();
// var timer1 = Stopwatch.StartNew();
// var allIds = new HashSet<string>();
// decimal? score = null;
// string member = null;
// while (true)
// {
// var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: score,
// lastMember: member);
// meterInfos.AddRange(page.Items);
// focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
// foreach (var item in page.Items)
// {
// if (!allIds.Add(item.MemberId))
// {
// _logger.LogError($"{item.MemberId}Duplicate data found!");
// }
// }
// if (!page.HasNext) break;
// score = page.NextScore;
// member = page.NextMember;
// }
// timer1.Stop();
// _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
// DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
// return;
//#else
// var meterInfos = await GetAmmeterInfoList(gatherCode);
//#endif
var meterInfos = await GetAmmeterInfoList(gatherCode);
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes();
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
}
var timer = Stopwatch.StartNew();
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = null,
TimeDensity = item.Key,
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
{
if (string.IsNullOrWhiteSpace(item.Key))//集中器号为空,跳过
{
continue;
}
foreach (var ammeter in item)
{
deviceIds.Add(ammeter.MeterId.ToString());
//处理ItemCode
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
{
var itemArr = ammeter.DataTypes.Split(',').ToList();
#region
List<string> itemCodeList = new List<string>();
foreach (var dataType in itemArr)
{
var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表
var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
if (gatherItem != null)
{
if (!excludeItemCode.Contains(gatherItem.ItemCode))
{
itemCodeList.Add(gatherItem.ItemCode);
}
}
#region
if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS
{
itemCodeList.Add("10_95");
}
if (itemArr.Exists(e => e.Equals("109")))//WAVE_109
{
itemCodeList.Add("10_109");
}
#endregion
}
#endregion
ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
}
}
ammeterInfos.Add(ammeter);
}
}
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos);
}
//初始化设备组负载控制
if (deviceIds == null || deviceIds.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
}
timer.Stop();
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
}
/// <summary>
/// 1分钟采集电表数据只获取任务数据下发不构建任务
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{
int timeDensity = 5;
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = pageSize,
Conditions = conditions,
});
}
/// <summary>
/// 5分钟采集电表数据
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
{
int timeDensity = 5;
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = pageSize,
Conditions = conditions,
});
}
/// <summary>
/// 15分钟采集电表数据
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
{
int timeDensity = 15;
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = pageSize,
Conditions = conditions,
});
}
/// <summary>
/// 创建电表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
private async Task<List<MeterReadingTelemetryPacketInfo>> AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
//return;
}
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return null;
}
//载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return null;
}
if (ammeterInfo.State.Equals(2))
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTaskAction)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return null;
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信区号为空");
return null;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址为空");
return null;
}
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},集中器通信地址无效,确保大于65535");
return null;
}
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 表ID:{ammeterInfo.MeterId},非有效测量点号({ammeterInfo.MeteringCode})");
return null;
}
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
if (tempCodes.Contains("0C_49"))
{
tempSubCodes.Add("0C_49");
}
if (tempSubCodes.Contains("0C_149"))
{
tempSubCodes.Add("0C_149");
}
if (ammeterInfo.ItemCodes.Contains("10_97"))
{
tempSubCodes.Add("10_97");
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
{
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
return null;
}
else
{
tempCodes = tempSubCodes;
}
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
{
continue;
}
//var itemCodeArr = tempItem.Split('_');
//var aFNStr = itemCodeArr[0];
//var aFN = (AFN)aFNStr.HexToDec();
//var fn = int.Parse(itemCodeArr[1]);
//TODO:特殊表
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = tempItem,
});
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{ammeterInfo.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = tempItem,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
taskList.Add(meterReadingRecords);
}
return taskList;
}
/// <summary>
/// 获取电表阀控配置
/// </summary>
/// <param name="currentTime">阀控的时间</param>
/// <returns></returns>
public virtual Task<List<AmmeterAutoValveControlSetting>> GetAmmeterAutoValveControlSetting(string currentTime)
{
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 电表自动阀控
/// </summary>
/// <returns></returns>
public virtual Task AmmeterScheduledAutoValveControl()
{
throw new NotImplementedException($"{nameof(AmmeterScheduledAutoValveControl)}请根据不同系统类型进行实现");
}
/// <summary>
/// 电表自动校时
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">电表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">采集频率对应的时间戳</param>
/// <returns></returns>
public virtual async Task AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string currentTimeStr = $"{currentTime:HH:mm:00}";
try
{
//判断是否是自动校时时间
if (!string.Equals(currentTimeStr , _applicationOptions.AutomaticVerificationTime,StringComparison.CurrentCultureIgnoreCase))
{
_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间");
return;
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
var temCode = "10_01";
//根据电表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
return;
}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Pn = ammeterInfo.MeteringCode,
ItemCode = temCode,
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = ammeterInfo.AmmerterAddress,
Password = ammeterInfo.Password,
ItemCode = T645PacketItemCodeConst.C08,
}
});
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{ammeterInfo.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{ammeterInfo.MeterId}",
Timestamps = currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(),
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = currentTime,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = temCode,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(),
};
taskList.Add(meterReadingRecords);
if (taskList == null || taskList.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有自动阀控任务生成,-106");
return;
}
//任务记录入库
await _dbProvider.BatchInsertAsync(metadata, taskList);
//任务信息推送Kafka
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<MeterReadingTelemetryPacketInfo>(
items: taskList,
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, data, groupIndex);
}
);
//todo 阀控记录入库,推送到新的服务
}
catch (Exception)
{
throw;
}
}
#endregion
#region
/// <summary>
/// 获取水表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 初始化水表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitWatermeterCacheData(string gatherCode = "")
{
var meterInfos = await GetWatermeterInfoList(gatherCode);
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes();
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
}
List<string> deviceIds = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = null,
TimeDensity = item.Key,
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
List<WatermeterInfo> watermeterInfo = new List<WatermeterInfo>();
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
{
if (string.IsNullOrWhiteSpace(item.Key))
{
continue;
}
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
foreach (var subItem in item)
{
deviceIds.Add(subItem.MeterId.ToString());
watermeterInfo.Add(subItem);
}
await _redisDataCacheService.BatchInsertDataAsync<WatermeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo);
}
}
//初始化设备组负载控制
if (deviceIds == null || deviceIds.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions);
}
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
}
/// <summary>
/// 水表数据采集
/// </summary>
/// <returns></returns>
public virtual async Task WatermeterScheduledMeterAutoReading()
{
//获取缓存中的水表信息
int timeDensity = 60;//水表目前只有一个采集频率 60分钟
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
{
Field = "PendingCopyReadTime",
Operator = "=",
IsNumber = true,
Value = pendingCopyReadTime
});
_ = CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
PageSize = pageSize,
Conditions = conditions,
});
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
}
/// <summary>
/// 创建水表待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="watermeter">水表信息</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="timestamps">时间格式的任务批次名称</param>
/// <returns></returns>
private async Task<List<MeterReadingTelemetryPacketInfo>> WatermeterCreatePublishTaskAction(int timeDensity
, WatermeterInfo watermeter, int groupIndex, DateTime timestamps)
{
var currentTime = DateTime.Now;
string typeName;
if (watermeter.MeterType == MeterTypeEnum.WaterMeter)
{
timeDensity = watermeter.TimeDensity;//水表默认为60分钟
typeName = watermeter.LinkType;
if (watermeter.MeterBrand.Contains("泉高阀门") || watermeter.MeterBrand.Equals("LXSY-山水翔"))
{
typeName = watermeter.MeterBrand;
}
}
else if (watermeter.MeterType == MeterTypeEnum.WaterMeterFlowmeter)
{
typeName = watermeter.MeterBrand;
}
else
{
_logger.LogError($"{nameof(WatermeterCreatePublishTaskAction)} 水表类型错误:{watermeter.Serialize()}");
return null;
}
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
List<string> tempCodes = new List<string>() { "10_1" };
//todo 后续从协议池获取
if (watermeter.MeterTypeName.Equals("水表") && (watermeter.Protocol.Equals((int)MeterLinkProtocol.CJT_188_2018) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_1997) || watermeter.Protocol.Equals((int)MeterLinkProtocol.DLT_645_2007)))//水表且CJT_188_2018或DLT_645_1997都采用0C_129
{
if (watermeter.MeterBrand.Contains("炬华有线"))
{
tempCodes = new List<string>() { "0C_188" };
}
else
{
tempCodes = new List<string>() { "0C_129" };
}
}
else if (typeName.Trim().Equals("西恩超声波流量计"))
{
tempCodes = new List<string>() { "10_1" };
}
else if (typeName.Trim().Equals("江苏华海涡街流量计积算仪"))
{
tempCodes = new List<string>() { "10_1" };
}
else if (typeName.Trim().Equals("V880BR涡街流量计"))
{
tempCodes = new List<string>() { "10_1" };
}
else if (typeName.Trim().Equals("拓思特涡街流量计H880BR"))
{
tempCodes = new List<string>() { "10_1" };
}
//根据表型号获取协议插件
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code);
if (protocolPlugin == null)
{
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
//return;
}
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
{
continue;
}
//var itemCodeArr = tempItem.Split('_');
//var aFNStr = itemCodeArr[0];
//var aFN = (AFN)aFNStr.HexToDec();
//var fn = int.Parse(itemCodeArr[1]);
//TelemetryPacketResponse builderResponse = null;
//string methonCode = $"AFN{aFNStr}_Fn_Send";
////特殊表暂不处理
//if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
// , out var handler))
//{
// builderResponse = handler(new TelemetryPacketRequest()
// {
// FocusAddress = watermeter.FocusAddress,
// Fn = fn,
// Pn = watermeter.MeteringCode,
// DataUnit = Build188SendData.Build188WaterMeterReadingSendDataUnit(watermeter.Address),
// });
//}
//else
//{
// _logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}无效编码。");
// continue;
//}
ProtocolBuildResponse builderResponse = await protocolPlugin.BuildAsync(new ProtocolBuildRequest()
{
FocusAddress = watermeter.FocusAddress,
Pn = watermeter.MeteringCode,
ItemCode = tempItem,
SubProtocolRequest = new SubProtocolBuildRequest()
{
MeterAddress = watermeter.MeterAddress,
Password = watermeter.Password,
ItemCode = tempItem,
}
});
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
if (builderResponse == null || builderResponse.Data.Length <= 0)
{
_logger.LogWarning($"{nameof(WatermeterCreatePublishTaskAction)} 集中器{watermeter.FocusAddress}的水表{watermeter.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, watermeter.MeteringCode, builderResponse.MSA, builderResponse.Seq);
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
SystemName = SystemType,
ProjectId = $"{watermeter.ProjectID}",
DeviceType = $"{MeterTypeEnum.Ammeter}",
DeviceId = $"{watermeter.MeterId}",
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = watermeter.DatabaseBusiID,
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = watermeter.MeterAddress,
AFN = builderResponse.AFn,
Fn = builderResponse.Fn,
Seq = builderResponse.Seq,
MSA = builderResponse.MSA,
ItemCode = tempItem,
TaskMark = taskMark,
IsSend = false,
ManualOrNot = false,
Pn = watermeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(builderResponse.Data),
IsReceived = false,
ScoreValue = $"{watermeter.FocusAddress}.{taskMark}".Md5Fun(),
};
taskList.Add(meterReadingRecords);
}
return taskList;
}
#endregion
#region
/// <summary>
/// 判断是否需要生成采集指令
/// </summary>
/// <param name="nextTaskTime"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
protected bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
{
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
{
return true;
}
return false;
}
/// <summary>
/// 创建表的待发送的任务数据
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="nextTaskTime">采集频率对应的任务时间戳</param>
/// <param name="meterType">表类型</param>
/// <param name="taskCreateAction">具体的创建任务的委托</param>
/// <returns></returns>
protected async Task CreateMeterPublishTask<T>(int timeDensity, DateTime nextTaskTime, MeterTypeEnum meterType, Action<int, T, int, DateTime> taskCreateAction) where T : DeviceCacheBasicModel
{
var timer = Stopwatch.StartNew();
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}";
List<T> meterInfos = new List<T>();
decimal? cursor = null;
string member = null;
bool hasNext;
do
{
var page = await _redisDataCacheService.GetAllPagedData<T>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
//var page = await _redisDataCacheService.GetAllPagedData<T>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 10,
// lastScore: cursor,
// lastMember: member);
//meterInfos.AddRange(page.Items);
if (meterInfos == null || meterInfos.Count <= 0)
{
timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
taskCreateAction(timeDensity, data, groupIndex, nextTaskTime);
}
);
timer.Stop();
_logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息");
}
/// <summary>
/// 创建Kafka消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="kafkaTopicName">kafka主题名称</param>
/// <param name="options">任务查询条件</param>
/// <returns></returns>
protected async Task CreateMeterKafkaTaskMessage<T>(string kafkaTopicName, IoTDBQueryOptions options) where T : IoTEntity, new()
{
if (string.IsNullOrWhiteSpace(kafkaTopicName))
{
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空-101");
return;
}
int pageNumber = 0;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
do
{
options.PageIndex = pageNumber++;
var pageResult = await _dbProvider.QueryAsync<T>(options);
hasNext = pageResult.HasNext;
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<T>(
items: pageResult.Items.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
{
_ = KafkaProducerIssuedMessageAction(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, data, groupIndex);
}
);
} while (hasNext);
stopwatch.Stop();
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
protected async Task KafkaProducerIssuedMessageAction<T>(string topicName,
T taskRecord, int partition) where T : class
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
}
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
}
#endregion
}
}