1368 lines
64 KiB
C#
Raw Normal View History

using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
2025-04-16 17:36:46 +08:00
using JiShe.CollectBus.Application.Contracts;
2025-03-14 17:28:58 +08:00
using JiShe.CollectBus.Common.BuildSendDatas;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Common.Enums;
2025-03-14 17:28:58 +08:00
using JiShe.CollectBus.Common.Extensions;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Common.Helpers;
2025-04-16 17:36:46 +08:00
using JiShe.CollectBus.Common.Models;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.GatherItem;
2025-04-08 17:44:42 +08:00
using JiShe.CollectBus.IoTDBProvider;
2025-03-17 14:23:48 +08:00
using JiShe.CollectBus.IotSystems.MessageIssueds;
2025-03-18 22:43:24 +08:00
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
2025-03-14 14:38:08 +08:00
using JiShe.CollectBus.IotSystems.Watermeter;
2025-04-15 15:49:51 +08:00
using JiShe.CollectBus.Kafka.Producer;
2025-03-14 14:24:38 +08:00
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.RedisDataCache;
2025-03-20 16:40:27 +08:00
using JiShe.CollectBus.Repository.MeterReadingRecord;
2025-04-15 17:40:17 +08:00
using Mapster;
2025-03-14 14:24:38 +08:00
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter;
2025-03-14 14:24:38 +08:00
2025-03-14 14:38:08 +08:00
namespace JiShe.CollectBus.ScheduledMeterReading
2025-03-14 14:24:38 +08:00
{
/// <summary>
/// 定时采集服务
/// </summary>
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger<BasicScheduledMeterReadingService> _logger;
2025-04-08 17:44:42 +08:00
private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
2025-04-15 15:57:14 +08:00
private readonly IProducerService _producerService;
2025-04-16 17:36:46 +08:00
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly ICapPublisher _producerBus;
2025-03-17 14:23:48 +08:00
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher producerBus,
2025-04-08 17:44:42 +08:00
IMeterReadingRecordRepository meterReadingRecordRepository,
2025-04-15 15:57:14 +08:00
IProducerService producerService,
2025-04-16 17:36:46 +08:00
IRedisDataCacheService redisDataCacheService,
2025-04-08 17:44:42 +08:00
IIoTDBProvider dbProvider)
2025-03-14 14:24:38 +08:00
{
_producerBus = producerBus;
2025-03-14 14:24:38 +08:00
_logger = logger;
2025-04-08 17:44:42 +08:00
_dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository;
2025-04-15 15:49:51 +08:00
_producerService = producerService;
2025-04-16 17:36:46 +08:00
_redisDataCacheService = redisDataCacheService;
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 系统类型
/// </summary>
public abstract string SystemType { get; }
2025-04-09 17:29:30 +08:00
/// <summary>
/// 应用服务器部署标记
/// </summary>
public abstract string ServerTagName { get; }
2025-03-14 14:24:38 +08:00
/// <summary>
///电表日冻结采集项
/// </summary>
protected List<string> DayFreezeCodes = new List<string>() { "0D_3", "0D_4", "0D_161", "0D_162", "0D_163", "0D_164", "0D_165", "0D_166", "0D_167", "0D_168", "0C_149", };
/// <summary>
/// 电表月冻结采集项
/// </summary>
protected List<string> MonthFreezeCodes = new List<string>() { "0D_177", "0D_178", "0D_179", "0D_180", "0D_181", "0D_182", "0D_183", "0D_184", "0D_193", "0D_195", };
/// <summary>
/// 获取采集项列表
/// </summary>
/// <returns></returns>
public virtual Task<List<GatherItemInfo>> GetGatherItemByDataTypes()
{
throw new NotImplementedException($"{nameof(GetGatherItemByDataTypes)}请根据不同系统类型进行实现");
}
/// <summary>
/// 构建待处理的下发指令任务处理
/// </summary>
/// <returns></returns>
public virtual async Task CreateToBeIssueTasks()
{
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
if (taskInfos == null || taskInfos.Length <= 0)
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时没有缓存数据,-101");
return;
}
foreach (var item in taskInfos)
{
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(item);
if (tasksToBeIssueModel == null)
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102");
continue;
}
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>TaskInfotempArryay[3]=>表计类别tempArryay[4]=>采集频率
var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
2025-04-15 09:43:51 +08:00
if (timeDensity > 15)
{
timeDensity = 15;
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity))
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue;
}
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
var timer = Stopwatch.StartNew();
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
decimal? cursor = null;
string member = null;
bool hasNext;
do
{
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
if (meterInfos == null || meterInfos.Count <= 0)
{
timer.Stop();
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
2025-04-15 09:43:51 +08:00
2025-04-15 15:49:51 +08:00
//处理数据
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
// items: meterInfos,
// deviceIdSelector: data => data.FocusAddress,
// processor: (data, threadId) =>
// {
// _ = AmmerterCreatePublishTask(timeDensity, data);
// }
//);
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
2025-04-15 09:43:51 +08:00
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data,groupIndex) =>
2025-04-15 09:43:51 +08:00
{
_ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
2025-04-15 09:43:51 +08:00
}
);
timer.Stop();
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
//todo 水表任务创建待处理
//await WatermeterScheduledMeterReadingIssued(timeDensity, meterInfos);
}
else
{
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-106");
}
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成");
2025-04-13 21:26:27 +08:00
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity);
2025-04-13 21:26:27 +08:00
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
}
2025-03-14 14:24:38 +08:00
#region
/// <summary>
/// 获取电表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task<List<AmmeterInfo>> GetAmmeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 初始化电表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
#if DEBUG
2025-04-16 23:51:27 +08:00
//var timeDensity = "15";
//string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
////获取缓存中的电表信息
//var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
2025-04-15 17:40:17 +08:00
//var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//var tempMeterInfos = await GetMeterRedisCacheListData<AmmeterInfoTemp>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
////List<string> focusAddressDataLista = new List<string>();
2025-04-16 23:51:27 +08:00
//List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
//foreach (var item in tempMeterInfos)
2025-04-16 23:51:27 +08:00
//{
// var tempData = item.Adapt<AmmeterInfo>();
// tempData.FocusId = item.FocusID;
// tempData.MeterId = item.Id;
// meterInfos.Add(tempData);
// //focusAddressDataLista.Add(item.FocusAddress);
//}
var timeDensity = "15";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
List<string> focusAddressDataLista = new List<string>();
var timer1 = Stopwatch.StartNew();
//decimal? cursor = null;
//string member = null;
//bool hasNext;
//do
//{
// var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
//} while (hasNext);
var allIds = new HashSet<string>();
decimal? score = null;
string member = null;
while (true)
{
var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: score,
lastMember: member);
meterInfos.AddRange(page.Items);
focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress));
foreach (var item in page.Items)
{
if (!allIds.Add(item.MemberId))
throw new Exception("Duplicate data found!");
}
if (!page.HasNext) break;
score = page.NextScore;
member = page.NextMember;
}
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//return;
#else
2025-03-14 14:24:38 +08:00
var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
2025-03-14 14:24:38 +08:00
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes();
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
}
2025-04-15 17:40:17 +08:00
var timer = Stopwatch.StartNew();
2025-03-14 14:24:38 +08:00
List<string> focusAddressDataList = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
2025-03-14 14:24:38 +08:00
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
2025-04-16 17:36:46 +08:00
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
2025-04-15 17:40:17 +08:00
List<AmmeterInfo> ammeterInfos = new List<AmmeterInfo>();
2025-03-14 14:24:38 +08:00
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
{
if (string.IsNullOrWhiteSpace(item.Key))//集中器号为空,跳过
2025-03-14 14:24:38 +08:00
{
continue;
}
2025-04-10 14:12:14 +08:00
focusAddressDataList.Add(item.Key);
2025-04-15 17:40:17 +08:00
// var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG
2025-03-21 11:48:31 +08:00
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#else
2025-03-21 11:48:31 +08:00
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
2025-04-15 17:40:17 +08:00
//await FreeRedisProvider.Instance.DelAsync(redisCacheKey);
#endif
2025-04-15 17:40:17 +08:00
//Dictionary<string, AmmeterInfo> keyValuePairs = new Dictionary<string, AmmeterInfo>();
2025-03-14 14:24:38 +08:00
foreach (var ammeter in item)
{
//处理ItemCode
2025-03-27 08:38:19 +08:00
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes))
2025-03-14 14:24:38 +08:00
{
var itemArr = ammeter.DataTypes.Split(',').ToList();
#region
List<string> itemCodeList = new List<string>();
foreach (var dataType in itemArr)
{
2025-03-17 11:34:30 +08:00
var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表
2025-03-14 14:24:38 +08:00
var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType));
if (gatherItem != null)
{
if (!excludeItemCode.Contains(gatherItem.ItemCode))
{
itemCodeList.Add(gatherItem.ItemCode);
}
}
2025-03-17 11:34:30 +08:00
#region
if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS
{
itemCodeList.Add("10_95");
}
if (itemArr.Exists(e => e.Equals("109")))//WAVE_109
{
itemCodeList.Add("10_109");
}
#endregion
2025-03-14 14:24:38 +08:00
}
#endregion
2025-03-17 11:34:30 +08:00
2025-03-14 14:24:38 +08:00
ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串
if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109");
}
}
2025-04-15 17:40:17 +08:00
ammeterInfos.Add(ammeter);
//keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter);
2025-03-14 14:24:38 +08:00
}
2025-04-15 17:40:17 +08:00
//await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
2025-03-14 14:24:38 +08:00
}
2025-04-16 17:36:46 +08:00
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos);
2025-04-15 17:40:17 +08:00
//在缓存表信息数据的时候新增下一个时间的自动处理任务1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
TimeDensity = itemTimeDensity.Key,
NextTaskTime = DateTime.Now.AddMinutes(1)
};
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
2025-03-14 14:24:38 +08:00
}
2025-04-10 14:12:14 +08:00
//初始化设备组负载控制
if (focusAddressDataList == null || focusAddressDataList.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList);
}
2025-04-15 17:40:17 +08:00
timer.Stop();
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 1分钟采集电表数据只获取任务数据下发不构建任务
2025-03-14 14:24:38 +08:00
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{
//获取缓存中的电表信息
int timeDensity = 1;
2025-04-08 17:44:42 +08:00
var currentTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
2025-03-17 08:35:19 +08:00
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
2025-03-14 14:24:38 +08:00
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
2025-03-14 14:24:38 +08:00
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
2025-03-14 14:24:38 +08:00
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
2025-03-14 14:24:38 +08:00
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
2025-03-14 14:24:38 +08:00
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
2025-03-14 14:24:38 +08:00
{
foreach (var ammerterItem in focusItem.Value)
{
2025-03-18 22:43:24 +08:00
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
2025-04-16 18:26:25 +08:00
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg);
2025-04-07 21:34:05 +08:00
//_= _producerBus.Publish(tempMsg);
2025-04-01 22:50:34 +08:00
meterTaskInfosList.Add(ammerterItem.Value);
}
2025-03-14 14:24:38 +08:00
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
2025-03-14 14:24:38 +08:00
{
2025-04-08 17:44:42 +08:00
//_dbProvider.SwitchSessionPool(true);
//await _dbProvider.InsertAsync(meterTaskInfosList);
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
2025-03-14 14:24:38 +08:00
}
2025-04-13 21:26:27 +08:00
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
//await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
2025-03-18 22:43:24 +08:00
_logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 5分钟采集电表数据
2025-03-14 14:24:38 +08:00
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFiveMinuteReading()
2025-03-14 14:24:38 +08:00
{
//获取缓存中的电表信息
int timeDensity = 5;
2025-04-08 17:44:42 +08:00
var currentTime = DateTime.Now;
2025-04-10 14:12:14 +08:00
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
2025-03-18 22:43:24 +08:00
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
2025-03-14 14:24:38 +08:00
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
2025-03-14 14:24:38 +08:00
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fiveMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
2025-03-14 14:24:38 +08:00
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
2025-03-14 14:24:38 +08:00
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
2025-03-17 11:34:30 +08:00
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
2025-03-14 14:24:38 +08:00
{
foreach (var ammerterItem in focusItem.Value)
2025-03-14 14:24:38 +08:00
{
2025-03-18 22:43:24 +08:00
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
2025-04-16 18:26:25 +08:00
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
2025-04-01 22:50:34 +08:00
2025-04-07 21:34:05 +08:00
//_ = _producerBus.Publish(tempMsg);
2025-03-18 22:43:24 +08:00
meterTaskInfosList.Add(ammerterItem.Value);
2025-03-14 14:24:38 +08:00
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
2025-03-14 14:24:38 +08:00
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
2025-03-14 14:24:38 +08:00
}
2025-04-13 21:26:27 +08:00
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
2025-03-18 22:43:24 +08:00
2025-04-13 21:26:27 +08:00
////缓存下一个时间的任务
//await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
2025-03-14 14:24:38 +08:00
_logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 15分钟采集电表数据
2025-03-14 14:24:38 +08:00
/// </summary>
/// <returns></returns>
public virtual async Task AmmeterScheduledMeterFifteenMinuteReading()
2025-03-14 14:24:38 +08:00
{
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
2025-03-14 14:24:38 +08:00
//获取缓存中的电表信息
int timeDensity = 15;
2025-03-20 16:40:27 +08:00
var currentDateTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
2025-03-18 22:43:24 +08:00
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
2025-03-14 14:24:38 +08:00
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
2025-03-14 14:24:38 +08:00
return;
}
2025-03-18 22:43:24 +08:00
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
2025-03-14 14:24:38 +08:00
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
2025-03-14 14:24:38 +08:00
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
2025-03-14 14:24:38 +08:00
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
2025-03-14 14:24:38 +08:00
{
foreach (var ammerterItem in focusItem.Value)
{
2025-03-18 22:43:24 +08:00
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
2025-03-24 20:54:31 +08:00
2025-04-16 18:26:25 +08:00
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
2025-03-18 22:43:24 +08:00
2025-04-07 21:34:05 +08:00
//_ = _producerBus.Publish(tempMsg);
2025-03-18 22:43:24 +08:00
meterTaskInfosList.Add(ammerterItem.Value);
}
2025-03-14 14:24:38 +08:00
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
2025-03-14 14:24:38 +08:00
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
2025-03-14 14:24:38 +08:00
}
2025-04-15 15:49:51 +08:00
2025-03-18 22:43:24 +08:00
stopwatch.Stop();
2025-03-14 14:24:38 +08:00
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 电表采集任务指令创建
/// </summary>
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
2025-03-14 14:24:38 +08:00
/// <param name="focusGroup">集中器数据分组</param>
/// <returns></returns>
2025-03-20 16:40:27 +08:00
private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
2025-03-14 14:24:38 +08:00
{
2025-03-20 16:40:27 +08:00
if (timeDensity <= 0)
{
timeDensity = 1;
}
if (timeDensity > 15)
{
timeDensity = 15;
}
if (focusGroup == null || focusGroup.Count <= 0)
2025-03-14 14:24:38 +08:00
{
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
return;
}
try
{
//将采集器编号的hash值取模分组
2025-03-17 14:23:48 +08:00
const int TotalShards = 1024;
2025-03-14 14:24:38 +08:00
var focusHashGroups = new Dictionary<int, Dictionary<string, Dictionary<string, AmmeterInfo>>>();
foreach (var (collectorId, ammetersDictionary) in focusGroup)
{
if (string.IsNullOrWhiteSpace(collectorId))
{
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败无效Key -102");
continue;
}
// 计算哈希分组ID
int hashGroupId = Math.Abs(collectorId.GetHashCode() % TotalShards);
// 获取或创建分组(避免重复查找)
if (!focusHashGroups.TryGetValue(hashGroupId, out var group))
{
group = new Dictionary<string, Dictionary<string, AmmeterInfo>>();
focusHashGroups[hashGroupId] = group;
}
// 将当前集中器数据加入分组
group[collectorId] = ammetersDictionary;
}
if (focusHashGroups == null)
{
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败 -103");
return;
}
//根据分组创建线程批处理集中器
foreach (var group in focusHashGroups)
{
await AmmerterCreatePublishTask2(timeDensity, group.Value);
2025-03-14 14:24:38 +08:00
}
}
catch (Exception)
{
throw;
2025-03-14 17:28:58 +08:00
}
2025-03-14 14:24:38 +08:00
}
2025-04-14 17:38:34 +08:00
2025-03-14 14:24:38 +08:00
/// <summary>
/// 电表创建发布任务
2025-03-14 14:24:38 +08:00
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
2025-03-14 14:24:38 +08:00
/// <returns></returns>
2025-03-20 16:40:27 +08:00
private async Task AmmerterCreatePublishTask(int timeDensity
, AmmeterInfo ammeterInfo,int groupIndex,string taskBatch)
2025-04-14 17:38:34 +08:00
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
2025-04-15 09:43:51 +08:00
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
2025-04-15 15:49:51 +08:00
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return;
}
//载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
2025-04-15 15:49:51 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return;
}
2025-04-14 17:38:34 +08:00
if (ammeterInfo.State.Equals(2))
2025-04-14 17:38:34 +08:00
{
2025-04-15 15:49:51 +08:00
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return;
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
2025-04-14 17:38:34 +08:00
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
2025-04-15 15:49:51 +08:00
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
return;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{
2025-04-15 15:49:51 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
return;
}
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{
2025-04-15 15:49:51 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
return;
}
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{
2025-04-15 15:49:51 +08:00
//_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
return;
}
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
if (tempCodes.Contains("0C_49"))
2025-04-14 17:38:34 +08:00
{
tempSubCodes.Add("0C_49");
2025-04-14 17:38:34 +08:00
}
if (tempSubCodes.Contains("0C_149"))
2025-04-14 17:38:34 +08:00
{
tempSubCodes.Add("0C_149");
2025-04-14 17:38:34 +08:00
}
if (ammeterInfo.ItemCodes.Contains("10_97"))
2025-04-14 17:38:34 +08:00
{
tempSubCodes.Add("10_97");
2025-04-14 17:38:34 +08:00
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
2025-04-14 17:38:34 +08:00
{
2025-04-15 15:49:51 +08:00
//_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
return;
2025-04-14 17:38:34 +08:00
}
else
2025-04-14 17:38:34 +08:00
{
tempCodes = tempSubCodes;
2025-04-14 17:38:34 +08:00
}
}
//Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
2025-04-14 17:38:34 +08:00
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
2025-04-14 17:38:34 +08:00
{
continue;
}
var itemCodeArr = tempItem.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null;
if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.)
2025-04-14 17:38:34 +08:00
{
//实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn);
2025-04-14 17:38:34 +08:00
}
else
2025-04-14 17:38:34 +08:00
{
string methonCode = $"AFN{aFNStr}_Fn_Send";
//特殊表暂不处理
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
2025-04-14 17:38:34 +08:00
{
dataInfos = handler(new TelemetryPacketRequest()
2025-04-14 17:38:34 +08:00
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
Pn = ammeterInfo.MeteringCode
});
2025-04-14 17:38:34 +08:00
}
else
2025-04-14 17:38:34 +08:00
{
2025-04-15 15:49:51 +08:00
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
2025-04-14 17:38:34 +08:00
continue;
}
}
//TODO:特殊表
2025-04-14 17:38:34 +08:00
if (dataInfos == null || dataInfos.Length <= 0)
{
2025-04-15 15:49:51 +08:00
//_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
2025-04-14 17:38:34 +08:00
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
ProjectID = ammeterInfo.ProjectID,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
2025-04-15 16:48:35 +08:00
MeterId = ammeterInfo.MeterId,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress,
FocusId = ammeterInfo.FocusId,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode),
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
};
//meterReadingRecords.CreateDataId(GuidGenerator.Create());
taskList.Add(meterReadingRecords);
2025-04-14 17:38:34 +08:00
}
2025-04-15 09:43:51 +08:00
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan);
//return keyValuePairs;
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
//using (var pipe = FreeRedisProvider.Instance.StartPipe())
//{
// pipe.HSet(redisCacheKey, keyValuePairs);
// object[] ret = pipe.EndPipe();
//}
if (taskList == null
|| taskList.Count() <= 0
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
2025-04-15 09:43:51 +08:00
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101");
return;
2025-04-15 09:43:51 +08:00
}
await _redisDataCacheService.BatchInsertDataAsync(
redisCacheTelemetryPacketInfoHashKey,
redisCacheTelemetryPacketInfoSetIndexKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
taskList);
2025-04-15 15:49:51 +08:00
}
/// <summary>
/// Kafka 推送消息
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <returns></returns>
private async Task KafkaProducerIssuedMessage(string topicName,
MeterReadingRecords taskRecord)
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
}
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
2025-04-15 15:57:14 +08:00
2025-04-16 18:26:25 +08:00
await _producerService.ProduceAsync(topicName, partition, taskRecord);
2025-04-15 15:49:51 +08:00
}
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
{
var currentDateTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, meterType);
//FreeRedisProvider.Instance.key()
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), meterType);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
2025-04-15 15:49:51 +08:00
2025-04-16 18:26:25 +08:00
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
2025-04-15 15:49:51 +08:00
//_ = _producerBus.Publish(tempMsg);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
2025-04-14 17:38:34 +08:00
}
/// <summary>
/// 电表创建发布任务
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask2(int timeDensity
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
2025-03-14 14:24:38 +08:00
{
2025-03-20 16:40:27 +08:00
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
var currentTime = DateTime.Now;
2025-03-20 16:40:27 +08:00
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
2025-03-14 14:24:38 +08:00
foreach (var focusInfo in focusGroup)
{
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
2025-04-16 17:36:46 +08:00
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
2025-03-14 14:24:38 +08:00
foreach (var ammeterInfo in focusInfo.Value)
2025-03-14 17:28:58 +08:00
{
var ammeter = ammeterInfo.Value;
2025-03-14 14:24:38 +08:00
2025-03-14 17:28:58 +08:00
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes))
2025-03-14 14:24:38 +08:00
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,采集项为空,-101");
2025-03-14 14:24:38 +08:00
continue;
}
//载波的不处理
2025-03-14 17:28:58 +08:00
if (ammeter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
2025-03-14 14:24:38 +08:00
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,载波不处理,-102");
2025-03-14 14:24:38 +08:00
continue;
}
2025-03-14 17:28:58 +08:00
if (ammeter.State.Equals(2))
2025-03-14 14:24:38 +08:00
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeter.Name} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}状态为禁用,不处理");
2025-03-14 14:24:38 +08:00
continue;
}
2025-03-17 11:34:30 +08:00
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
2025-03-14 14:24:38 +08:00
2025-03-14 17:28:58 +08:00
if (string.IsNullOrWhiteSpace(ammeter.AreaCode))
2025-03-14 14:24:38 +08:00
{
2025-04-15 16:48:35 +08:00
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空");
2025-03-14 14:24:38 +08:00
continue;
}
2025-03-14 17:28:58 +08:00
if (string.IsNullOrWhiteSpace(ammeter.Address))
2025-03-14 14:24:38 +08:00
{
2025-04-15 16:48:35 +08:00
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空");
2025-03-14 14:24:38 +08:00
continue;
}
2025-03-14 17:28:58 +08:00
if (Convert.ToInt32(ammeter.Address) > 65535)
2025-03-14 14:24:38 +08:00
{
2025-04-15 16:48:35 +08:00
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535");
2025-03-14 14:24:38 +08:00
continue;
}
2025-03-14 17:28:58 +08:00
if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033)
2025-03-14 14:24:38 +08:00
{
2025-04-15 16:48:35 +08:00
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})");
2025-03-14 14:24:38 +08:00
continue;
}
2025-03-14 17:28:58 +08:00
List<string> tempCodes = ammeter.ItemCodes.Deserialize<List<string>>()!;
2025-03-14 14:24:38 +08:00
//TODO:自动上报数据只主动采集1类数据。
2025-03-14 17:28:58 +08:00
if (ammeter.AutomaticReport.Equals(1))
2025-03-14 14:24:38 +08:00
{
var tempSubCodes = new List<string>();
2025-03-14 17:28:58 +08:00
if (tempCodes.Contains("0C_49"))
{
tempSubCodes.Add("0C_49");
}
if (tempSubCodes.Contains("0C_149"))
2025-03-14 14:24:38 +08:00
{
2025-03-14 17:28:58 +08:00
tempSubCodes.Add("0C_149");
}
if (ammeter.ItemCodes.Contains("10_97"))
2025-03-14 14:24:38 +08:00
{
2025-03-14 17:28:58 +08:00
tempSubCodes.Add("10_97");
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
{
_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}自动上报数据主动采集1类数据时数据类型为空");
2025-03-14 14:24:38 +08:00
continue;
}
else
{
2025-03-14 17:28:58 +08:00
tempCodes = tempSubCodes;
2025-03-14 14:24:38 +08:00
}
}
2025-03-17 11:34:30 +08:00
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
2025-03-14 14:24:38 +08:00
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
2025-03-14 17:28:58 +08:00
if (DayFreezeCodes.Contains(tempItem))
2025-03-14 14:24:38 +08:00
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
{
continue;
}
2025-03-14 17:28:58 +08:00
var itemCodeArr = tempItem.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
2025-03-14 17:28:58 +08:00
var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null;
2025-03-17 11:34:30 +08:00
if (ammeter.AutomaticReport.Equals(1) && aFN == AFN.)
2025-03-14 17:28:58 +08:00
{
//实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeter.FocusAddress, ammeter.MeteringCode, (ATypeOfDataItems)fn);
}
else
{
string methonCode = $"AFN{aFNStr}_Fn_Send";
2025-03-17 11:34:30 +08:00
//特殊表暂不处理
2025-03-20 16:40:27 +08:00
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
2025-03-14 17:28:58 +08:00
{
dataInfos = handler(new TelemetryPacketRequest()
{
FocusAddress = ammeter.FocusAddress,
Fn = fn,
Pn = ammeter.MeteringCode
});
2025-03-14 17:28:58 +08:00
}
else
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}无效编码。");
2025-03-17 11:34:30 +08:00
continue;
2025-03-14 17:28:58 +08:00
}
}
//TODO:特殊表
2025-03-14 14:24:38 +08:00
2025-03-14 17:28:58 +08:00
if (dataInfos == null || dataInfos.Length <= 0)
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}未能正确获取报文。");
2025-03-14 17:28:58 +08:00
continue;
}
2025-03-14 14:24:38 +08:00
2025-03-18 22:43:24 +08:00
var meterReadingRecords = new MeterReadingRecords()
2025-03-14 17:28:58 +08:00
{
2025-03-21 11:48:31 +08:00
ProjectID = ammeter.ProjectID,
DatabaseBusiID = ammeter.DatabaseBusiID,
2025-03-20 16:40:27 +08:00
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime,
2025-03-18 22:43:24 +08:00
MeterAddress = ammeter.AmmerterAddress,
2025-04-15 16:48:35 +08:00
MeterId = ammeter.MeterId,
2025-03-18 22:43:24 +08:00
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeter.FocusAddress,
2025-04-15 16:48:35 +08:00
FocusID = ammeter.FocusId,
2025-03-18 22:43:24 +08:00
AFN = aFN,
Fn = fn,
2025-04-08 17:44:42 +08:00
ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode),
2025-04-08 17:44:42 +08:00
ManualOrNot = false,
2025-03-18 22:43:24 +08:00
Pn = ammeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
2025-03-18 21:21:35 +08:00
IssuedMessageHexString = Convert.ToHexString(dataInfos),
2025-03-14 17:28:58 +08:00
};
//meterReadingRecords.CreateDataId(GuidGenerator.Create());
2025-04-15 16:48:35 +08:00
keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
2025-03-14 17:28:58 +08:00
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
}
}
#endregion
#region
/// <summary>
/// 获取水表信息
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "")
{
throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现");
}
/// <summary>
/// 初始化水表缓存数据
/// </summary>
/// <param name="gatherCode">采集端Code</param>
/// <returns></returns>
public virtual async Task InitWatermeterCacheData(string gatherCode = "")
{
var meterInfos = await GetWatermeterInfoList(gatherCode);
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitWatermeterCacheData)} 初始化水表缓存数据时,水表数据为空");
}
//获取采集项类型数据
var gatherItemInfos = await GetGatherItemByDataTypes();
if (gatherItemInfos == null || gatherItemInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据时,采集项类型数据为空");
}
2025-03-14 14:24:38 +08:00
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
//将表计信息根据集中器分组,获得集中器号
var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList();
foreach (var item in meterInfoGroup)
{
if (string.IsNullOrWhiteSpace(item.Key))
{
continue;
2025-03-14 17:28:58 +08:00
}
2025-03-14 14:24:38 +08:00
2025-04-16 17:36:46 +08:00
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}";
Dictionary<string, WatermeterInfo> keyValuePairs = new Dictionary<string, WatermeterInfo>();
foreach (var subItem in item)
{
2025-04-15 16:48:35 +08:00
keyValuePairs.TryAdd($"{subItem.MeterId}", subItem);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
2025-03-14 14:24:38 +08:00
}
//在缓存表信息数据的时候新增下一个时间的自动处理任务1分钟后执行
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
TimeDensity = itemTimeDensity.Key,
NextTaskTime = DateTime.Now.AddMinutes(1)
};
2025-04-11 11:56:23 +08:00
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
2025-03-17 11:34:30 +08:00
}
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化水表缓存数据完成");
2025-03-14 14:24:38 +08:00
}
/// <summary>
/// 水表数据采集
/// </summary>
/// <returns></returns>
public virtual async Task WatermeterScheduledMeterAutoReading()
{
//获取缓存中的水表信息
int timeDensity = 1;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101");
return;
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.WaterMeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
{
foreach (var ammerterItem in focusItem.Value)
{
2025-03-18 22:43:24 +08:00
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
2025-04-15 15:57:14 +08:00
//await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg);
2025-04-01 22:50:34 +08:00
2025-04-07 21:34:05 +08:00
//_ = _producerBus.Publish(tempMsg);
2025-04-01 22:50:34 +08:00
2025-03-18 22:43:24 +08:00
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
2025-04-13 21:26:27 +08:00
////删除任务数据
//await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
2025-03-18 22:43:24 +08:00
2025-04-13 21:26:27 +08:00
////缓存下一个时间的任务
//await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成");
}
#endregion
#region
/// <summary>
/// 判断是否需要生成采集指令
/// </summary>
/// <param name="nextTaskTime"></param>
/// <param name="timeDensity"></param>
/// <returns></returns>
private bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0)
{
if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime)
{
return true;
}
return false;
}
2025-03-18 22:43:24 +08:00
///// <summary>
///// 指定时间对比当前时间
///// </summary>
///// <param name="lastTime"></param>
///// <param name="subtrahend"></param>
///// <returns></returns>
//private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0)
//{
// if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令
// return false;
// return true;
//}
2025-03-14 14:24:38 +08:00
2025-04-13 21:26:27 +08:00
///// <summary>
///// 缓存下一个时间的任务
///// </summary>
///// <param name="timeDensity">采集频率</param>
///// <param name="meterType">表类型</param>
///// <returns></returns>
//private async Task CacheNextTaskData(int timeDensity, MeterTypeEnum meterType)
//{
// //缓存下一个时间的任务
// TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
// {
// TimeDensity = timeDensity,
// NextTask = DateTime.Now.AddMinutes(timeDensity)
// };
// var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, meterType, timeDensity);
// await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
//}
/// <summary>
/// 获取缓存表计下发指令缓存key前缀
/// </summary>
/// <param name="timeDensity"></param>
/// <param name="meterType"></param>
/// <returns></returns>
private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType)
{
2025-04-16 17:36:46 +08:00
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*";
}
2025-03-14 14:24:38 +08:00
#endregion
2025-03-14 14:24:38 +08:00
}
}