优化任务创建,使用线程分组

This commit is contained in:
陈益 2025-04-14 21:56:24 +08:00
parent 8b86381e7a
commit 1284898376
2 changed files with 264 additions and 207 deletions

View File

@ -3,6 +3,7 @@ using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
@ -82,7 +83,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary>
/// <returns></returns>
public virtual async Task CreateToBeIssueTasks()
{
{
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
if (taskInfos == null || taskInfos.Length <= 0)
@ -98,7 +99,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
_logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102");
continue;
}
}
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>TaskInfotempArryay[3]=>表计类别tempArryay[4]=>采集频率
var tempArryay = item.Split(":");
@ -128,19 +129,23 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
// 解析结果(结果为嵌套数组)
var meterInfos = await GetMeterRedisCacheDictionaryData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
if (meterInfos == null || meterInfos.Count <= 0)
{
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
DeviceGroupBalanceControl.ProcessAllGroups(deviceId => {
Console.WriteLine($"Processing {deviceId} on Thread {Thread.CurrentThread.ManagedThreadId}");
});
await AmmerterCreatePublishTask(timeDensity, meterInfos);
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
// 处理数据
await DeviceGroupBalanceControl.ProcessGenericListAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data, threadId) =>
{
_= AmmerterCreatePublishTask(timeDensity, data);
}
);
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{
@ -567,7 +572,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据分组创建线程批处理集中器
foreach (var group in focusHashGroups)
{
await AmmerterCreatePublishTask(timeDensity, group.Value);
await AmmerterCreatePublishTask2(timeDensity, group.Value);
}
}
catch (Exception)
@ -582,10 +587,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 电表创建发布任务
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="ammeterGroup">集中器号hash分组的集中器集合数据</param>
/// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity
, List<AmmeterInfo> ammeterGroup)
, AmmeterInfo ammeterInfo)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
@ -593,171 +598,167 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.Key}";
foreach (var ammeterInfo in ammeterGroup)
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
continue;
}
//载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
continue;
}
if (ammeterInfo.State.Equals(2))
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
continue;
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
continue;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
continue;
}
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
continue;
}
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 2033)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
continue;
}
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
if (tempCodes.Contains("0C_49"))
{
tempSubCodes.Add("0C_49");
}
if (tempSubCodes.Contains("0C_149"))
{
tempSubCodes.Add("0C_149");
}
if (ammeterInfo.ItemCodes.Contains("10_97"))
{
tempSubCodes.Add("10_97");
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
{
_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
continue;
}
else
{
tempCodes = tempSubCodes;
}
}
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
{
continue;
}
var itemCodeArr = tempItem.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null;
if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.)
{
//实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn);
}
else
{
string methonCode = $"AFN{aFNStr}_Fn_Send";
//特殊表暂不处理
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
{
dataInfos = handler(new TelemetryPacketRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
Pn = ammeterInfo.MeteringCode
});
}
else
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
continue;
}
}
//TODO:特殊表
if (dataInfos == null || dataInfos.Length <= 0)
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
var meterReadingRecords = new MeterReadingRecords()
{
ProjectID = ammeterInfo.ProjectID,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
MeterId = ammeterInfo.ID,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress,
FocusID = ammeterInfo.FocusID,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode),
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
};
meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return;
}
//载波的不处理
if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102");
return;
}
if (ammeterInfo.State.Equals(2))
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理");
return;
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
return;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空");
return;
}
if (Convert.ToInt32(ammeterInfo.Address) > 65535)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535");
return;
}
if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})");
return;
}
List<string> tempCodes = ammeterInfo.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeterInfo.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
if (tempCodes.Contains("0C_49"))
{
tempSubCodes.Add("0C_49");
}
if (tempSubCodes.Contains("0C_149"))
{
tempSubCodes.Add("0C_149");
}
if (ammeterInfo.ItemCodes.Contains("10_97"))
{
tempSubCodes.Add("10_97");
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
{
_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空");
return;
}
else
{
tempCodes = tempSubCodes;
}
}
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
{
continue;
}
var itemCodeArr = tempItem.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null;
if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.)
{
//实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn);
}
else
{
string methonCode = $"AFN{aFNStr}_Fn_Send";
//特殊表暂不处理
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
{
dataInfos = handler(new TelemetryPacketRequest()
{
FocusAddress = ammeterInfo.FocusAddress,
Fn = fn,
Pn = ammeterInfo.MeteringCode
});
}
else
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。");
continue;
}
}
//TODO:特殊表
if (dataInfos == null || dataInfos.Length <= 0)
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
var meterReadingRecords = new MeterReadingRecords()
{
ProjectID = ammeterInfo.ProjectID,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
MeterId = ammeterInfo.ID,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress,
FocusID = ammeterInfo.FocusID,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode),
ManualOrNot = false,
Pn = ammeterInfo.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
};
meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
/// <summary>
@ -1033,7 +1034,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList,SystemType,ServerTagName ,timeDensity.ToString(), MeterTypeEnum.WaterMeter);
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.WaterMeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
@ -1081,7 +1082,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
#region
/// <summary>
/// 判断是否需要生成采集指令

View File

@ -108,48 +108,104 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
}
/// <summary>
/// 并行处理所有分组设备(每个分组一个处理线程
/// 并行处理泛型数据集(支持动态线程分配
/// </summary>
public static void ProcessAllGroups<T>(Action<List<T>> processAction) where T : DeviceGroupBasicModel
/// <typeparam name="T">已经分组的设备信息</typeparam>
/// <param name="items">部分或者全部的已经分组的设备集合</param>
/// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
/// <param name="processor">处理委托参数当前对象线程ID</param>
/// <param name="maxThreads">可选线程限制</param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public static async Task ProcessGenericListAsync<T>(
List<T> items, Func<T, string> deviceIdSelector, Action<T, int> processor, int? maxThreads = null)
{
var cache = _currentCache;
if (cache == null)
throw new InvalidOperationException("缓存未初始化");
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
// 使用并行选项控制并发度
// 创建分组任务队列
var groupQueues = new ConcurrentQueue<T>[cache.CachedGroups.Length];
for (int i = 0; i < groupQueues.Length; i++)
{
groupQueues[i] = new ConcurrentQueue<T>();
}
// 阶段1分发数据到分组队列
Parallel.ForEach(items, item =>
{
var deviceId = deviceIdSelector(item);
if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId))
{
groupQueues[groupId].Enqueue(item);
}
});
if ((maxThreads.HasValue && maxThreads.Value > cache.CachedGroups.Length) || maxThreads.HasValue == false)
{
maxThreads = cache.CachedGroups.Length;
}
// 阶段2并行处理队列
var options = new ParallelOptions
{
MaxDegreeOfParallelism = cache.CachedGroups.Length // 严格匹配分组数量
MaxDegreeOfParallelism = maxThreads.Value,
};
Parallel.For(0, cache.CachedGroups.Length, options, groupId =>
await Task.Run(() =>
{
// 获取当前分组的只读副本
var groupDevices = GetGroupSnapshot(cache, groupId);
processAction(groupDevices);
//foreach (var deviceId in groupDevices)
//{
// //执行处理操作
// processAction(deviceId);
// // 可添加取消检测
// // if (token.IsCancellationRequested) break;
//}
Parallel.For(0, cache.CachedGroups.Length, options, groupId =>
{
var queue = groupQueues[groupId];
while (queue.TryDequeue(out T item))
{
processor(item, Thread.CurrentThread.ManagedThreadId);
}
});
});
}
/// <summary>
/// 获取分组数据快照(线程安全
/// 并行处理所有分组设备(每个分组一个处理线程)
/// </summary>
private static IReadOnlyList<string> GetGroupSnapshot(CacheState cache, int groupId)
{
lock (cache.CachedGroups[groupId])
{
return cache.CachedGroups[groupId].ToList(); // 创建内存快照
}
}
//public static void ProcessAllGroups<T>(Action<List<T>> processAction) where T : DeviceGroupBasicModel
//{
// var cache = _currentCache;
// if (cache == null)
// throw new InvalidOperationException("缓存未初始化");
// // 使用并行选项控制并发度
// var options = new ParallelOptions
// {
// MaxDegreeOfParallelism = cache.CachedGroups.Length // 严格匹配分组数量
// };
// Parallel.For(0, cache.CachedGroups.Length, options, groupId =>
// {
// // 获取当前分组的只读副本
// var groupDevices = GetGroupSnapshot(cache, groupId);
// processAction(groupDevices);
// //foreach (var deviceId in groupDevices)
// //{
// // //执行处理操作
// // processAction(deviceId);
// // // 可添加取消检测
// // // if (token.IsCancellationRequested) break;
// //}
// });
//}
///// <summary>
///// 获取分组数据快照(线程安全)
///// </summary>
//public static IReadOnlyList<string> GetGroupSnapshot(CacheState cache, int groupId)
//{
// lock (cache.CachedGroups[groupId])
// {
// return cache.CachedGroups[groupId].ToList(); // 创建内存快照
// }
//}
/// <summary>
/// 通过 deviceId 获取所在的分组集合