diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index d032194..ce7b39b 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -6,6 +6,7 @@ using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.EnergySystems.Entities; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IotSystems.MessageIssueds; @@ -81,10 +82,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// public virtual async Task CreateToBeIssueTasks() - { - - //创建指定数量的线程, - + { var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*"; var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey); if (taskInfos == null || taskInfos.Length <= 0) @@ -116,7 +114,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading - //获取缓存中的电表信息 + //获取缓存中的表信息 var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, meteryType, timeDensity)}*"; var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) @@ -137,6 +135,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); + + DeviceGroupBalanceControl.ProcessAllGroups(deviceId => { + Console.WriteLine($"Processing {deviceId} on Thread {Thread.CurrentThread.ManagedThreadId}"); + }); + + await AmmerterCreatePublishTask(timeDensity, meterInfos); } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { @@ -573,13 +577,196 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } + + /// + /// 电表创建发布任务 + /// + /// 采集频率 + /// 集中器号hash分组的集中器集合数据 + /// + private async Task AmmerterCreatePublishTask(int timeDensity + , List ammeterGroup) + { + var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; + //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? + + var currentTime = DateTime.Now; + var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); + //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 + var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.Key}"; + + foreach (var ammeterInfo in ammeterGroup) + { + + if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); + continue; + } + + //载波的不处理 + if (ammeterInfo.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,载波不处理,-102"); + continue; + } + + if (ammeterInfo.State.Equals(2)) + { + _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}状态为禁用,不处理"); + continue; + } + + ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 + //if (!IsGennerateCmd(ammeter.LastTime, -1)) + //{ + // _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime},已超过1天未在线,不生成指令"); + // continue; + //} + + if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); + continue; + } + if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址为空"); + continue; + } + if (Convert.ToInt32(ammeterInfo.Address) > 65535) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信地址无效,确保大于65535"); + continue; + } + if (ammeterInfo.MeteringCode <= 0 || ammeterInfo.MeteringCode > 2033) + { + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},非有效测量点号({ammeterInfo.MeteringCode})"); + continue; + } + + List tempCodes = ammeterInfo.ItemCodes.Deserialize>()!; + + //TODO:自动上报数据只主动采集1类数据。 + if (ammeterInfo.AutomaticReport.Equals(1)) + { + var tempSubCodes = new List(); + if (tempCodes.Contains("0C_49")) + { + tempSubCodes.Add("0C_49"); + } + + if (tempSubCodes.Contains("0C_149")) + { + tempSubCodes.Add("0C_149"); + } + + if (ammeterInfo.ItemCodes.Contains("10_97")) + { + tempSubCodes.Add("10_97"); + } + + if (tempSubCodes == null || tempSubCodes.Count <= 0) + { + _logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}自动上报数据主动采集1类数据时数据类型为空"); + continue; + } + else + { + tempCodes = tempSubCodes; + } + } + + Dictionary keyValuePairs = new Dictionary(); + + foreach (var tempItem in tempCodes) + { + //排除已发送日冻结和月冻结采集项配置 + if (DayFreezeCodes.Contains(tempItem)) + { + continue; + } + + if (MonthFreezeCodes.Contains(tempItem)) + { + continue; + } + + var itemCodeArr = tempItem.Split('_'); + var aFNStr = itemCodeArr[0]; + var aFN = (AFN)aFNStr.HexToDec(); + var fn = int.Parse(itemCodeArr[1]); + byte[] dataInfos = null; + if (ammeterInfo.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据) + { + //实时数据 + dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeterInfo.FocusAddress, ammeterInfo.MeteringCode, (ATypeOfDataItems)fn); + } + else + { + string methonCode = $"AFN{aFNStr}_Fn_Send"; + //特殊表暂不处理 + if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode + , out var handler)) + { + dataInfos = handler(new TelemetryPacketRequest() + { + FocusAddress = ammeterInfo.FocusAddress, + Fn = fn, + Pn = ammeterInfo.MeteringCode + }); + } + else + { + _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}无效编码。"); + continue; + } + } + //TODO:特殊表 + + if (dataInfos == null || dataInfos.Length <= 0) + { + _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}采集项{tempItem}未能正确获取报文。"); + continue; + } + + + + var meterReadingRecords = new MeterReadingRecords() + { + ProjectID = ammeterInfo.ProjectID, + DatabaseBusiID = ammeterInfo.DatabaseBusiID, + PendingCopyReadTime = pendingCopyReadTime, + CreationTime = currentTime, + MeterAddress = ammeterInfo.AmmerterAddress, + MeterId = ammeterInfo.ID, + MeterType = MeterTypeEnum.Ammeter, + FocusAddress = ammeterInfo.FocusAddress, + FocusID = ammeterInfo.FocusID, + AFN = aFN, + Fn = fn, + ItemCode = tempItem, + TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeterInfo.MeteringCode), + ManualOrNot = false, + Pn = ammeterInfo.MeteringCode, + IssuedMessageId = GuidGenerator.Create().ToString(), + IssuedMessageHexString = Convert.ToHexString(dataInfos), + }; + meterReadingRecords.CreateDataId(GuidGenerator.Create()); + + keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); + } + await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); + } + } + /// /// 电表创建发布任务 /// /// 采集频率 /// 集中器号hash分组的集中器集合数据 /// - private async Task AmmerterCreatePublishTask(int timeDensity + private async Task AmmerterCreatePublishTask2(int timeDensity , Dictionary> focusGroup) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; diff --git a/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs similarity index 83% rename from src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs rename to src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index 657ff7c..13d51a8 100644 --- a/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs +++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -7,7 +7,7 @@ using System.Text; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; -namespace JiShe.CollectBus.Common.Helpers +namespace JiShe.CollectBus.Common.DeviceBalanceControl { /// /// 设备组负载控制 @@ -107,6 +107,50 @@ namespace JiShe.CollectBus.Common.Helpers } } + /// + /// 并行处理所有分组设备(每个分组一个处理线程) + /// + public static void ProcessAllGroups(Action> processAction) where T : DeviceGroupBasicModel + { + var cache = _currentCache; + if (cache == null) + throw new InvalidOperationException("缓存未初始化"); + + // 使用并行选项控制并发度 + var options = new ParallelOptions + { + MaxDegreeOfParallelism = cache.CachedGroups.Length // 严格匹配分组数量 + }; + + Parallel.For(0, cache.CachedGroups.Length, options, groupId => + { + // 获取当前分组的只读副本 + var groupDevices = GetGroupSnapshot(cache, groupId); + + processAction(groupDevices); + + //foreach (var deviceId in groupDevices) + //{ + // //执行处理操作 + // processAction(deviceId); + + // // 可添加取消检测 + // // if (token.IsCancellationRequested) break; + //} + }); + } + + /// + /// 获取分组数据快照(线程安全) + /// + private static IReadOnlyList GetGroupSnapshot(CacheState cache, int groupId) + { + lock (cache.CachedGroups[groupId]) + { + return cache.CachedGroups[groupId].ToList(); // 创建内存快照 + } + } + /// /// 通过 deviceId 获取所在的分组集合 /// diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBasicModel.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBasicModel.cs new file mode 100644 index 0000000..f12f15e --- /dev/null +++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBasicModel.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.DeviceBalanceControl +{ + /// + /// 设备组基本模型 + /// + public class DeviceGroupBasicModel + { + /// + /// 设备Id + /// + public string DeviceId { get; set; } + } +}