From 142f864544feddd3af5327abc49291fed04ecc7f Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 14 Apr 2025 23:42:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BC=93=E5=AD=98=E7=94=B5?= =?UTF-8?q?=E8=A1=A8=E4=BF=A1=E6=81=AF=E8=8E=B7=E5=8F=96=EF=BC=8C=E9=87=87?= =?UTF-8?q?=E7=94=A8=E5=88=86=E9=A1=B5=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusAppService.cs | 198 ++++++++++++------ .../Samples/SampleAppService.cs | 2 +- .../BasicScheduledMeterReadingService.cs | 36 ++-- ...nergySystemScheduledMeterReadingService.cs | 1 + .../Workers/CreateToBeIssueTaskWorker.cs | 4 +- 5 files changed, 156 insertions(+), 85 deletions(-) diff --git a/src/JiShe.CollectBus.Application/CollectBusAppService.cs b/src/JiShe.CollectBus.Application/CollectBusAppService.cs index 467b487..f84d82f 100644 --- a/src/JiShe.CollectBus.Application/CollectBusAppService.cs +++ b/src/JiShe.CollectBus.Application/CollectBusAppService.cs @@ -9,6 +9,7 @@ using JiShe.CollectBus.Serializer; using Microsoft.AspNetCore.Mvc; using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Volo.Abp.Application.Services; @@ -38,57 +39,87 @@ public abstract class CollectBusAppService : ApplicationService /// protected async Task>> GetMeterRedisCacheDictionaryData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class { - if (redisKeys == null || redisKeys.Length <=0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) + if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) { throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101"); } - //通过lua脚本一次性获取所有缓存内容 + var meterInfos = new Dictionary>(); var luaScript = @" - local results = {} - for i, key in ipairs(KEYS) do - local data = redis.call('HGETALL', key) - results[i] = {key, data} - end - return results"; - var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS - if (merterResult == null) - { - throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,没有获取到数据,-102"); - } + local results = {} + for i, key in ipairs(KEYS) do + local data = redis.call('HGETALL', key) + results[i] = {key, data} + end + return results"; - // 解析结果(结果为嵌套数组) - var meterInfos = new Dictionary>(); ; - if (merterResult is object[] arr) + // 分页参数:每页处理10000个键 + int pageSize = 10000; + int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize); + + for (int page = 0; page < totalPages; page++) { - foreach (object[] item in arr) + // 分页获取当前批次的键 + var batchKeys = redisKeys + .Skip(page * pageSize) + .Take(pageSize) + .ToArray(); + + // 执行Lua脚本获取当前批次数据 + var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys); + if (merterResult == null) { - string key = (string)item[0];//集中器地址对应的Redis缓存Key - object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}"; - string focusAddress = key.Replace(redisCacheKey, "");//集中器地址 + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,第 {page + 1} 页数据未返回,-102"); + } - var meterHashs = new Dictionary(); - for (int i = 0; i < fieldsAndValues.Length; i += 2) + // 解析当前批次的结果 + if (merterResult is object[] arr) + { + foreach (object[] item in arr) { - string meterld = (string)fieldsAndValues[i];//表ID - string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 + string key = (string)item[0]; + object[] fieldsAndValues = (object[])item[1]; + var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}"; + string focusAddress = key.Replace(redisCacheKey, ""); - T meterInfo = default!; - if (!string.IsNullOrWhiteSpace(meterStr)) + var meterHashs = new Dictionary(); + for (int i = 0; i < fieldsAndValues.Length; i += 2) { - meterInfo = meterStr.Deserialize()!; + string meterId = (string)fieldsAndValues[i]; + string meterStr = (string)fieldsAndValues[i + 1]; + + T meterInfo = default!; + if (!string.IsNullOrWhiteSpace(meterStr)) + { + meterInfo = meterStr.Deserialize()!; + } + if (meterInfo != null) + { + meterHashs[meterId] = meterInfo; + } + else + { + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 缓存表计数据异常,集中器 {key} 的表计 {meterId} 解析失败,-103"); + } } - if (meterInfo != null) + + // 合并到总结果,若存在重复key则覆盖 + if (meterInfos.ContainsKey(focusAddress)) { - meterHashs[meterld] = meterInfo; + foreach (var kvp in meterHashs) + { + meterInfos[focusAddress][kvp.Key] = kvp.Value; + } } else { - throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息集中器缓存{key}数据的{meterld}处理异常,-102"); + meterInfos[focusAddress] = meterHashs; } } - meterInfos[focusAddress] = meterHashs; + } + else + { + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 第 {page + 1} 页数据解析失败,返回类型不符,-104"); } } @@ -105,58 +136,87 @@ public abstract class CollectBusAppService : ApplicationService /// 采集频率,1分钟、5分钟、15分钟 /// 表计类型 /// - protected async Task> GetMeterRedisCacheListData(string[] redisKeys,string systemType,string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class + protected async Task> GetMeterRedisCacheListData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class { - if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) + if (redisKeys == null || redisKeys.Length <= 0 || + string.IsNullOrWhiteSpace(systemType) || + string.IsNullOrWhiteSpace(serverTagName) || + string.IsNullOrWhiteSpace(timeDensity)) { - throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息失败,参数异常,-101"); + throw new Exception($"{nameof(GetMeterRedisCacheListData)} 参数异常,-101"); } - //通过lua脚本一次性获取所有缓存内容 + var meterInfos = new List(); var luaScript = @" - local results = {} - for i, key in ipairs(KEYS) do - local data = redis.call('HGETALL', key) - results[i] = {key, data} - end - return results"; - var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS - if (merterResult == null) - { - throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息失败,没有获取到数据,-102"); - } + local results = {} + for i, key in ipairs(KEYS) do + local data = redis.call('HGETALL', key) + results[i] = {key, data} + end + return results"; - // 解析结果(结果为嵌套数组) - var meterInfos = new List(); ; - if (merterResult is object[] arr) + // 分页参数:每页10000个键 + int pageSize = 10000; + int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize); + + for (int page = 0; page < totalPages; page++) { - foreach (object[] item in arr) + // 分页获取当前批次键 + var batchKeys = redisKeys + .Skip(page * pageSize) + .Take(pageSize) + .ToArray(); + + // 执行Lua脚本获取当前页数据 + var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys); + if (merterResult == null) { - string key = (string)item[0];//集中器地址对应的Redis缓存Key - object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}"; - string focusAddress = key.Replace(redisCacheKey, "");//集中器地址 + throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据未返回,-102"); + } - for (int i = 0; i < fieldsAndValues.Length; i += 2) + // 解析当前页结果 + if (merterResult is object[] arr) + { + foreach (object[] item in arr) { - string meterld = (string)fieldsAndValues[i];//表ID - string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 + string key = (string)item[0]; + object[] fieldsAndValues = (object[])item[1]; + var redisCacheKey = string.Format( + RedisConst.CacheMeterInfoKey, + systemType, + serverTagName, + meterType, + timeDensity + ); + string focusAddress = key.Replace(redisCacheKey, ""); - T meterInfo = default!; - if (!string.IsNullOrWhiteSpace(meterStr)) + for (int i = 0; i < fieldsAndValues.Length; i += 2) { - meterInfo = meterStr.Deserialize()!; - } - if (meterInfo != null) - { - meterInfos.Add(meterInfo); - } - else - { - throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息集中器缓存{key}数据的{meterld}处理异常,-103"); + string meterId = (string)fieldsAndValues[i]; + string meterStr = (string)fieldsAndValues[i + 1]; + + T meterInfo = default!; + if (!string.IsNullOrWhiteSpace(meterStr)) + { + meterInfo = meterStr.Deserialize()!; + } + if (meterInfo != null) + { + meterInfos.Add(meterInfo); + } + else + { + throw new Exception( + $"{nameof(GetMeterRedisCacheListData)} 表计 {meterId} 解析失败(页 {page + 1}),-103" + ); + } } } } + else + { + throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据格式错误,-104"); + } } return meterInfos; diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index ac09237..7013d37 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -13,13 +13,13 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Options; using JiShe.CollectBus.IoTDBProvider.Context; using Microsoft.Extensions.Logging; -using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IotSystems.AFNEntity; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using System.Diagnostics.Metrics; +using JiShe.CollectBus.Common.DeviceBalanceControl; namespace JiShe.CollectBus.Samples; diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 7e7626c..df03914 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -103,8 +103,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率 var tempArryay = item.Split(":"); - string meteryType = tempArryay[3];//表计类别 - int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率 + string meteryType = tempArryay[4];//表计类别 + int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率 + if(timeDensity > 15) + { + timeDensity = 15; + } //检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过 if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity)) @@ -128,6 +132,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { + var timer = Stopwatch.StartNew(); // 解析结果(结果为嵌套数组) var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); if (meterInfos == null || meterInfos.Count <= 0) @@ -136,16 +141,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); - - // 处理数据 - await DeviceGroupBalanceControl.ProcessGenericListAsync( - items: meterInfos, - deviceIdSelector: data => data.FocusAddress, - processor: (data, threadId) => - { - _= AmmerterCreatePublishTask(timeDensity, data); - } - ); + + + //处理数据 + //await DeviceGroupBalanceControl.ProcessGenericListAsync( + // items: meterInfos, + // deviceIdSelector: data => data.FocusAddress, + // processor: (data, threadId) => + // { + // _= AmmerterCreatePublishTask(timeDensity, data); + // } + //); + + timer.Stop(); + _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); + } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { @@ -758,7 +768,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); } - await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); + // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); } /// diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 62ede1d..cba4f41 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.GatherItem; diff --git a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs index 05fd90d..2857422 100644 --- a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs @@ -27,14 +27,14 @@ namespace JiShe.CollectBus.Workers { _logger = logger; RecurringJobId = nameof(CreateToBeIssueTaskWorker); - CronExpression = $"{10}/* * * * *"; + CronExpression = $"*/{1} * * * *"; this._scheduledMeterReadingService = scheduledMeterReadingService; } public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) { - await _scheduledMeterReadingService.CreateToBeIssueTasks(); + // await _scheduledMeterReadingService.CreateToBeIssueTasks(); } } }