From 12848983765ea4a554834293970001c90a572609 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E7=9B=8A?= Date: Mon, 14 Apr 2025 21:56:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=EF=BC=8C=E4=BD=BF=E7=94=A8=E7=BA=BF=E7=A8=8B=E5=88=86?= =?UTF-8?q?=E7=BB=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BasicScheduledMeterReadingService.cs | 357 +++++++++--------- .../DeviceGroupBalanceControl.cs | 114 ++++-- 2 files changed, 264 insertions(+), 207 deletions(-) diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index ce7b39b..7e7626c 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -3,6 +3,7 @@ using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; @@ -82,7 +83,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// public virtual async Task CreateToBeIssueTasks() - { + { var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*"; var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey); if (taskInfos == null || taskInfos.Length <= 0) @@ -98,7 +99,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { _logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102"); continue; - } + } //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率 var tempArryay = item.Split(":"); @@ -128,19 +129,23 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { // 解析结果(结果为嵌套数组) - var meterInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); + var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); return; } - await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); - - DeviceGroupBalanceControl.ProcessAllGroups(deviceId => { - Console.WriteLine($"Processing {deviceId} on Thread {Thread.CurrentThread.ManagedThreadId}"); - }); - - await AmmerterCreatePublishTask(timeDensity, meterInfos); + //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); + + // 处理数据 + await DeviceGroupBalanceControl.ProcessGenericListAsync( + items: meterInfos, + deviceIdSelector: data => data.FocusAddress, + processor: (data, threadId) => + { + _= AmmerterCreatePublishTask(timeDensity, data); + } + ); } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { @@ -567,7 +572,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据分组创建线程批处理集中器 foreach (var group in focusHashGroups) { - await AmmerterCreatePublishTask(timeDensity, group.Value); + await AmmerterCreatePublishTask2(timeDensity, group.Value); } } catch (Exception) @@ -582,10 +587,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 电表创建发布任务 /// /// 采集频率 - /// 集中器号hash分组的集中器集合数据 + /// 集中器号hash分组的集中器集合数据 /// private async Task AmmerterCreatePublishTask(int timeDensity - , List ammeterGroup) + , AmmeterInfo ammeterInfo) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? @@ -593,171 +598,167 @@ namespace JiShe.CollectBus.ScheduledMeterReading var currentTime = DateTime.Now; var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 - var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.Key}"; - - foreach (var ammeterInfo in ammeterGroup) + var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}"; + + if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { - - if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); - continue; - } - - //载波的不处理 - if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); - continue; - } - - if (ammeterInfo.State.Equals(2)) - { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); - continue; - } - - ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 - //if (!IsGennerateCmd(ammeter.LastTime, -1)) - //{ - // _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime},已超过1天未在线,不生成指令"); - // continue; - //} - - if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); - continue; - } - if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); - continue; - } - if (Convert.ToInt32(ammeterInfo.Address) > 65535) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); - continue; - } - if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 2033) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); - continue; - } - - List tempCodes = ammeterInfo.ItemCodes.Deserialize>()!; - - //TODO:自动上报数据只主动采集1类数据。 - if (ammeterInfo.AutomaticReport.Equals(1)) - { - var tempSubCodes = new List(); - if (tempCodes.Contains("0C_49")) - { - tempSubCodes.Add("0C_49"); - } - - if (tempSubCodes.Contains("0C_149")) - { - tempSubCodes.Add("0C_149"); - } - - if (ammeterInfo.ItemCodes.Contains("10_97")) - { - tempSubCodes.Add("10_97"); - } - - if (tempSubCodes == null || tempSubCodes.Count <= 0) - { - _logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); - continue; - } - else - { - tempCodes = tempSubCodes; - } - } - - Dictionary keyValuePairs = new Dictionary(); - - foreach (var tempItem in tempCodes) - { - //排除已发送日冻结和月冻结采集项配置 - if (DayFreezeCodes.Contains(tempItem)) - { - continue; - } - - if (MonthFreezeCodes.Contains(tempItem)) - { - continue; - } - - var itemCodeArr = tempItem.Split('_'); - var aFNStr = itemCodeArr[0]; - var aFN = (AFN)aFNStr.HexToDec(); - var fn = int.Parse(itemCodeArr[1]); - byte[] dataInfos = null; - if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据) - { - //实时数据 - dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn); - } - else - { - string methonCode = $"AFN{aFNStr}_Fn_Send"; - //特殊表暂不处理 - if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode - , out var handler)) - { - dataInfos = handler(new TelemetryPacketRequest() - { - FocusAddress = ammeterInfo.FocusAddress, - Fn = fn, - Pn = ammeterInfo.MeteringCode - }); - } - else - { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); - continue; - } - } - //TODO:特殊表 - - if (dataInfos == null || dataInfos.Length <= 0) - { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); - continue; - } - - - - var meterReadingRecords = new MeterReadingRecords() - { - ProjectID = ammeterInfo.ProjectID, - DatabaseBusiID = ammeterInfo.DatabaseBusiID, - PendingCopyReadTime = pendingCopyReadTime, - CreationTime = currentTime, - MeterAddress = ammeterInfo.AmmerterAddress, - MeterId = ammeterInfo.ID, - MeterType = MeterTypeEnum.Ammeter, - FocusAddress = ammeterInfo.FocusAddress, - FocusID = ammeterInfo.FocusID, - AFN = aFN, - Fn = fn, - ItemCode = tempItem, - TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode), - ManualOrNot = false, - Pn = ammeterInfo.MeteringCode, - IssuedMessageId = GuidGenerator.Create().ToString(), - IssuedMessageHexString = Convert.ToHexString(dataInfos), - }; - meterReadingRecords.CreateDataId(GuidGenerator.Create()); - - keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); - } - await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); + return; } + + //载波的不处理 + if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); + return; + } + + if (ammeterInfo.State.Equals(2)) + { + _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); + return; + } + + ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 + //if (!IsGennerateCmd(ammeter.LastTime, -1)) + //{ + // _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime},已超过1天未在线,不生成指令"); + // continue; + //} + + if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); + return; + } + if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); + return; + } + if (Convert.ToInt32(ammeterInfo.Address) > 65535) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); + return; + } + if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 33) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); + return; + } + + List tempCodes = ammeterInfo.ItemCodes.Deserialize>()!; + + //TODO:自动上报数据只主动采集1类数据。 + if (ammeterInfo.AutomaticReport.Equals(1)) + { + var tempSubCodes = new List(); + if (tempCodes.Contains("0C_49")) + { + tempSubCodes.Add("0C_49"); + } + + if (tempSubCodes.Contains("0C_149")) + { + tempSubCodes.Add("0C_149"); + } + + if (ammeterInfo.ItemCodes.Contains("10_97")) + { + tempSubCodes.Add("10_97"); + } + + if (tempSubCodes == null || tempSubCodes.Count <= 0) + { + _logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); + return; + } + else + { + tempCodes = tempSubCodes; + } + } + + Dictionary keyValuePairs = new Dictionary(); + + foreach (var tempItem in tempCodes) + { + //排除已发送日冻结和月冻结采集项配置 + if (DayFreezeCodes.Contains(tempItem)) + { + continue; + } + + if (MonthFreezeCodes.Contains(tempItem)) + { + continue; + } + + var itemCodeArr = tempItem.Split('_'); + var aFNStr = itemCodeArr[0]; + var aFN = (AFN)aFNStr.HexToDec(); + var fn = int.Parse(itemCodeArr[1]); + byte[] dataInfos = null; + if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据) + { + //实时数据 + dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn); + } + else + { + string methonCode = $"AFN{aFNStr}_Fn_Send"; + //特殊表暂不处理 + if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode + , out var handler)) + { + dataInfos = handler(new TelemetryPacketRequest() + { + FocusAddress = ammeterInfo.FocusAddress, + Fn = fn, + Pn = ammeterInfo.MeteringCode + }); + } + else + { + _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); + continue; + } + } + //TODO:特殊表 + + if (dataInfos == null || dataInfos.Length <= 0) + { + _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); + continue; + } + + + + var meterReadingRecords = new MeterReadingRecords() + { + ProjectID = ammeterInfo.ProjectID, + DatabaseBusiID = ammeterInfo.DatabaseBusiID, + PendingCopyReadTime = pendingCopyReadTime, + CreationTime = currentTime, + MeterAddress = ammeterInfo.AmmerterAddress, + MeterId = ammeterInfo.ID, + MeterType = MeterTypeEnum.Ammeter, + FocusAddress = ammeterInfo.FocusAddress, + FocusID = ammeterInfo.FocusID, + AFN = aFN, + Fn = fn, + ItemCode = tempItem, + TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode), + ManualOrNot = false, + Pn = ammeterInfo.MeteringCode, + IssuedMessageId = GuidGenerator.Create().ToString(), + IssuedMessageHexString = Convert.ToHexString(dataInfos), + }; + meterReadingRecords.CreateDataId(GuidGenerator.Create()); + + keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); + } + await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); } /// @@ -1033,7 +1034,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList,SystemType,ServerTagName ,timeDensity.ToString(), MeterTypeEnum.WaterMeter); + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.WaterMeter); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); @@ -1081,7 +1082,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading #region 公共处理方法 - + /// /// 判断是否需要生成采集指令 diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index 13d51a8..c7a5acd 100644 --- a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs +++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -108,48 +108,104 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl } /// - /// 并行处理所有分组设备(每个分组一个处理线程) + /// 并行处理泛型数据集(支持动态线程分配) /// - public static void ProcessAllGroups(Action> processAction) where T : DeviceGroupBasicModel + /// 已经分组的设备信息 + /// 部分或者全部的已经分组的设备集合 + /// 从泛型对象提取deviceId + /// 处理委托(参数:当前对象,线程ID) + /// 可选线程限制 + /// + /// + public static async Task ProcessGenericListAsync( + List items, Func deviceIdSelector, Action processor, int? maxThreads = null) { - var cache = _currentCache; - if (cache == null) - throw new InvalidOperationException("缓存未初始化"); + var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); - // 使用并行选项控制并发度 + // 创建分组任务队列 + var groupQueues = new ConcurrentQueue[cache.CachedGroups.Length]; + for (int i = 0; i < groupQueues.Length; i++) + { + groupQueues[i] = new ConcurrentQueue(); + } + + // 阶段1:分发数据到分组队列 + Parallel.ForEach(items, item => + { + var deviceId = deviceIdSelector(item); + if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId)) + { + groupQueues[groupId].Enqueue(item); + } + }); + + if ((maxThreads.HasValue && maxThreads.Value > cache.CachedGroups.Length) || maxThreads.HasValue == false) + { + maxThreads = cache.CachedGroups.Length; + } + + // 阶段2:并行处理队列 var options = new ParallelOptions { - MaxDegreeOfParallelism = cache.CachedGroups.Length // 严格匹配分组数量 + MaxDegreeOfParallelism = maxThreads.Value, }; - Parallel.For(0, cache.CachedGroups.Length, options, groupId => + await Task.Run(() => { - // 获取当前分组的只读副本 - var groupDevices = GetGroupSnapshot(cache, groupId); - - processAction(groupDevices); - - //foreach (var deviceId in groupDevices) - //{ - // //执行处理操作 - // processAction(deviceId); - - // // 可添加取消检测 - // // if (token.IsCancellationRequested) break; - //} + Parallel.For(0, cache.CachedGroups.Length, options, groupId => + { + var queue = groupQueues[groupId]; + while (queue.TryDequeue(out T item)) + { + processor(item, Thread.CurrentThread.ManagedThreadId); + } + }); }); } /// - /// 获取分组数据快照(线程安全) + /// 并行处理所有分组设备(每个分组一个处理线程) /// - private static IReadOnlyList GetGroupSnapshot(CacheState cache, int groupId) - { - lock (cache.CachedGroups[groupId]) - { - return cache.CachedGroups[groupId].ToList(); // 创建内存快照 - } - } + //public static void ProcessAllGroups(Action> processAction) where T : DeviceGroupBasicModel + //{ + // var cache = _currentCache; + // if (cache == null) + // throw new InvalidOperationException("缓存未初始化"); + + // // 使用并行选项控制并发度 + // var options = new ParallelOptions + // { + // MaxDegreeOfParallelism = cache.CachedGroups.Length // 严格匹配分组数量 + // }; + + // Parallel.For(0, cache.CachedGroups.Length, options, groupId => + // { + // // 获取当前分组的只读副本 + // var groupDevices = GetGroupSnapshot(cache, groupId); + + // processAction(groupDevices); + + // //foreach (var deviceId in groupDevices) + // //{ + // // //执行处理操作 + // // processAction(deviceId); + + // // // 可添加取消检测 + // // // if (token.IsCancellationRequested) break; + // //} + // }); + //} + + ///// + ///// 获取分组数据快照(线程安全) + ///// + //public static IReadOnlyList GetGroupSnapshot(CacheState cache, int groupId) + //{ + // lock (cache.CachedGroups[groupId]) + // { + // return cache.CachedGroups[groupId].ToList(); // 创建内存快照 + // } + //} /// /// 通过 deviceId 获取所在的分组集合