From 705a5cbbf727ed5dd88bffcef63a50301b8cc9a8 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 12 May 2025 23:18:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=AE=BE=E5=A4=87=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E4=BF=A1=E6=81=AFRedis=E6=95=B0=E6=8D=AE=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RedisDataCache/IRedisDataCacheService.cs | 18 ++++++ .../RedisDataCache/RedisDataCacheService.cs | 63 ++++++++++++++++++- .../BasicScheduledMeterReadingService.cs | 25 +++++++- .../Consts/RedisConst.cs | 4 +- 4 files changed, 103 insertions(+), 7 deletions(-) diff --git a/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs index b5f492c..d870202 100644 --- a/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.IotSystems.Ammeters; using System; using System.Collections.Generic; using System.Linq; @@ -34,14 +35,31 @@ namespace JiShe.CollectBus.Application.Contracts /// 主数据存储Hash缓存Key /// Set索引缓存Key /// ZSET索引缓存Key + /// 设备缓存信息 /// 待缓存数据集合 /// Task BatchInsertDataAsync( string redisHashCacheKey, string redisSetIndexCacheKey, string redisZSetScoresIndexCacheKey, + string redisDeviceInfoHashCacheKey, IEnumerable items) where T : DeviceCacheBasicModel; + + /// + /// 批量添加数据 + /// + /// + /// Set索引缓存Key + /// 设备缓存信息 + /// 待缓存数据集合 + /// + Task BatchInsertDataAsync2( + string redisSetIndexCacheKey, + string redisDeviceInfoHashCacheKey, + Dictionary> items) where T : DeviceCacheBasicModel; + + /// /// 删除缓存信息 /// diff --git a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index ebf1d2b..d648c89 100644 --- a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -16,6 +16,7 @@ using Volo.Abp.DependencyInjection; using static FreeSql.Internal.GlobalFilter; using static System.Runtime.InteropServices.JavaScript.JSType; using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application; +using JiShe.CollectBus.IotSystems.Ammeters; namespace JiShe.CollectBus.RedisDataCache { @@ -102,13 +103,16 @@ namespace JiShe.CollectBus.RedisDataCache string redisHashCacheKey, string redisSetIndexCacheKey, string redisZSetScoresIndexCacheKey, + string redisDeviceInfoHashCacheKey, IEnumerable items) where T : DeviceCacheBasicModel { if (items == null || items.Count() <= 0 || string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisSetIndexCacheKey) - || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey) + || string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey) + ) { _logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101"); return; @@ -131,10 +135,65 @@ namespace JiShe.CollectBus.RedisDataCache pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize()); // Set索引缓存 - pipe.SAdd(redisSetIndexCacheKey, item.MemberId); + pipe.SAdd(redisSetIndexCacheKey, $"{item.TimeDensity.ToString().PadLeft(2, '0')}:{item.FocusAddress}"); // ZSET索引缓存Key pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId); + + //设备信息缓存 + pipe.HSet(redisDeviceInfoHashCacheKey, item.FocusAddress, item.Serialize()); + + } + pipe.EndPipe(); + } + semaphore.Release(); + }); + } + + await Task.CompletedTask; + } + + /// + /// 批量添加数据 + /// + /// + /// Set索引缓存Key + /// 待缓存数据集合 + /// + public async Task BatchInsertDataAsync2( + string redisSetIndexCacheKey, + string redisDeviceInfoHashCacheKey, + Dictionary> items) where T : DeviceCacheBasicModel + { + if (items == null + || items.Count() <= 0 + || string.IsNullOrWhiteSpace(redisSetIndexCacheKey) + || string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey) + ) + { + _logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101"); + return; + } + + const int BATCH_SIZE = 1000; // 每批1000条 + var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); + + foreach (var batch in items.Batch(BATCH_SIZE)) + { + await semaphore.WaitAsync(); + + _ = Task.Run(() => + { + using (var pipe = Instance.StartPipe()) + { + foreach (var item in batch) + { + // Set索引缓存 + pipe.SAdd(redisSetIndexCacheKey, $"{item.Value.First().TimeDensity.ToString().PadLeft(2, '0')}:{item.Value.First().FocusAddress}"); + + //设备信息缓存 + pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, item.Value.Serialize()); + } pipe.EndPipe(); } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index cd694cf..8e02627 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -322,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - return; + //return; // 创建取消令牌源 //var cts = new CancellationTokenSource(); @@ -412,6 +412,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } + string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}"; + Dictionary> keyValuePairs = new Dictionary>(); foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) { @@ -476,13 +478,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading } ammeterInfos.Add(ammeter); + + if (!keyValuePairs.ContainsKey(ammeter.FocusAddress)) + { + keyValuePairs[ammeter.FocusAddress] = new List() {ammeter }; + } + else + { + keyValuePairs[ammeter.FocusAddress].Add(ammeter); + } } } + //await _redisDataCacheService.BatchInsertDataAsync2( + // redisCacheMeterInfoSetIndexKey, + // redisDeviceInfoHashCacheKey, + // keyValuePairs); + await _redisDataCacheService.BatchInsertDataAsync( redisCacheMeterInfoHashKey, redisCacheMeterInfoSetIndexKey, - redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos); + redisCacheMeterInfoZSetScoresIndexKey, + redisDeviceInfoHashCacheKey, + ammeterInfos); } //初始化设备组负载控制 @@ -1126,7 +1144,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } - + string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}"; foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) { List watermeterInfo = new List(); @@ -1155,6 +1173,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading await _redisDataCacheService.BatchInsertDataAsync( redisCacheMeterInfoHashKey, redisCacheMeterInfoSetIndexKey, + redisDeviceInfoHashCacheKey, redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo); } } diff --git a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs index 60340bc..be4ae4b 100644 --- a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs @@ -29,9 +29,9 @@ namespace JiShe.CollectBus.Common.Consts public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen"; /// - /// 集中器连接信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记 + /// 设备信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记 /// - public const string ConcentratorCacheHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:Concentrator"; + public const string DeviceInfoHashCacheKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceInfo"; public const string MeterInfo = "MeterInfo";