并行处理所有分组设备

This commit is contained in:
ChenYi 2025-04-14 17:38:34 +08:00
parent 2fdf5850c8
commit 8b86381e7a
3 changed files with 257 additions and 7 deletions

View File

@ -6,6 +6,7 @@ using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.EnergySystems.Entities;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.MessageIssueds;
@ -81,10 +82,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary>
/// <returns></returns>
public virtual async Task CreateToBeIssueTasks()
{
//创建指定数量的线程,
{
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
if (taskInfos == null || taskInfos.Length <= 0)
@ -116,7 +114,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的表信息
//获取缓存中的表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, meteryType, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
@ -137,6 +135,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return;
}
await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
DeviceGroupBalanceControl.ProcessAllGroups(deviceId => {
Console.WriteLine($"Processing {deviceId} on Thread {Thread.CurrentThread.ManagedThreadId}");
});
await AmmerterCreatePublishTask(timeDensity, meterInfos);
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
@ -573,13 +577,196 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
/// <summary>
/// 电表创建发布任务
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterGroup">集中器号hash分组的集中器集合数据</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity
, List<AmmeterInfo> ammeterGroup)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.Key}";
foreach (var ammeterInfo in ammeterGroup)
{
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
continue;
}
//载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
continue;
}
if (ammeterInfo.State.Equals(2))
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
continue;
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
continue;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
continue;
}
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
continue;
}
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 2033)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
continue;
}
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
if (tempCodes.Contains("0C_49"))
{
tempSubCodes.Add("0C_49");
}
if (tempSubCodes.Contains("0C_149"))
{
tempSubCodes.Add("0C_149");
}
if (ammeterInfo.ItemCodes.Contains("10_97"))
{
tempSubCodes.Add("10_97");
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
{
_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
continue;
}
else
{
tempCodes = tempSubCodes;
}
}
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
{
continue;
}
var itemCodeArr = tempItem.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null;
if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.)
{
//实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn);
}
else
{
string methonCode = $"AFN{aFNStr}_Fn_Send";
//特殊表暂不处理
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
{
dataInfos = handler(new TelemetryPacketRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
Pn = ammeterInfo.MeteringCode
});
}
else
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
continue;
}
}
//TODO:特殊表
if (dataInfos == null || dataInfos.Length <= 0)
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
var meterReadingRecords = new MeterReadingRecords()
{
ProjectID = ammeterInfo.ProjectID,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
MeterId = ammeterInfo.ID,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress,
FocusID = ammeterInfo.FocusID,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode),
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
};
meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
}
/// <summary>
/// 电表创建发布任务
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity
private async Task AmmerterCreatePublishTask2(int timeDensity
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;

View File

@ -7,7 +7,7 @@ using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Common.Helpers
namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
/// <summary>
/// 设备组负载控制
@ -107,6 +107,50 @@ namespace JiShe.CollectBus.Common.Helpers
}
}
/// <summary>
/// 并行处理所有分组设备(每个分组一个处理线程)
/// </summary>
public static void ProcessAllGroups<T>(Action<List<T>> processAction) where T : DeviceGroupBasicModel
{
var cache = _currentCache;
if (cache == null)
throw new InvalidOperationException("缓存未初始化");
// 使用并行选项控制并发度
var options = new ParallelOptions
{
MaxDegreeOfParallelism = cache.CachedGroups.Length // 严格匹配分组数量
};
Parallel.For(0, cache.CachedGroups.Length, options, groupId =>
{
// 获取当前分组的只读副本
var groupDevices = GetGroupSnapshot(cache, groupId);
processAction(groupDevices);
//foreach (var deviceId in groupDevices)
//{
// //执行处理操作
// processAction(deviceId);
// // 可添加取消检测
// // if (token.IsCancellationRequested) break;
//}
});
}
/// <summary>
/// 获取分组数据快照(线程安全)
/// </summary>
private static IReadOnlyList<string> GetGroupSnapshot(CacheState cache, int groupId)
{
lock (cache.CachedGroups[groupId])
{
return cache.CachedGroups[groupId].ToList(); // 创建内存快照
}
}
/// <summary>
/// 通过 deviceId 获取所在的分组集合
/// </summary>

View File

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
/// <summary>
/// 设备组基本模型
/// </summary>
public class DeviceGroupBasicModel
{
/// <summary>
/// 设备Id
/// </summary>
public string DeviceId { get; set; }
}
}