From 57123d653c62bb8817f129435f1cefc5b774c11a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E7=9B=8A?= Date: Tue, 15 Apr 2025 23:20:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BasicScheduledMeterReadingService.cs | 1 - .../Models/BusCacheGlobalPagedResult.cs | 7 +- .../Models/BusPagedResult.cs | 15 + .../Models/DeviceCacheBasicModel.cs | 4 +- .../FreeRedisProvider.cs | 544 +++++++++++++----- .../IFreeRedisProvider.cs | 60 ++ 6 files changed, 491 insertions(+), 140 deletions(-) diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 33e70f9..2fd9781 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -21,7 +21,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; -using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.ScheduledMeterReading { diff --git a/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs b/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs index ed42529..303b2e4 100644 --- a/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs +++ b/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs @@ -17,6 +17,11 @@ namespace JiShe.CollectBus.Common.Models /// public List Items { get; set; } + /// + /// 总条数 + /// + public long TotalCount { get; set; } + /// /// 是否有下一页 /// @@ -25,7 +30,7 @@ namespace JiShe.CollectBus.Common.Models /// /// 下一页的分页索引 /// - public long? NextScore { get; set; } + public decimal? NextScore { get; set; } /// /// 下一页的分页索引 diff --git a/src/JiShe.CollectBus.Common/Models/BusPagedResult.cs b/src/JiShe.CollectBus.Common/Models/BusPagedResult.cs index 2e68b14..f2bd1a3 100644 --- a/src/JiShe.CollectBus.Common/Models/BusPagedResult.cs +++ b/src/JiShe.CollectBus.Common/Models/BusPagedResult.cs @@ -31,5 +31,20 @@ namespace JiShe.CollectBus.Common.Models /// 数据集合 /// public IEnumerable Items { get; set; } + + /// + /// 是否有下一页 + /// + public bool HasNext { get; set; } + + /// + /// 下一页的分页索引 + /// + public decimal? NextScore { get; set; } + + /// + /// 下一页的分页索引 + /// + public string NextMember { get; set; } } } diff --git a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs index 06a427b..59011de 100644 --- a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs +++ b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs @@ -22,8 +22,8 @@ namespace JiShe.CollectBus.Common.Models public int MeterId { get; set; } /// - /// 唯一标识 + /// 唯一标识,是redis ZSet和Set memberid /// - public virtual string UniqueId => $"{FocusId}:{MeterId}"; + public virtual string MemberID => $"{FocusId}:{MeterId}"; } } diff --git a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs index 1e07e20..cebc9c4 100644 --- a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs +++ b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs @@ -2,12 +2,13 @@ using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Extensions; -using JiShe.CollectBus.FreeRedisProvider.Options; +using JiShe.CollectBus.FreeRedisProvider.Options; using Microsoft.Extensions.Options; using System.Diagnostics; using System.Text.Json; using Volo.Abp.DependencyInjection; using static System.Runtime.InteropServices.JavaScript.JSType; +using System.Collections.Concurrent; namespace JiShe.CollectBus.FreeRedisProvider { @@ -70,12 +71,12 @@ namespace JiShe.CollectBus.FreeRedisProvider { throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101"); } - + // 计算组合score(分类ID + 时间戳) var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks; - + //全局索引写入 long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); @@ -83,16 +84,16 @@ namespace JiShe.CollectBus.FreeRedisProvider using (var trans = Instance.Multi()) { // 主数据存储Hash - trans.HSet(redisCacheKey, data.UniqueId, data.Serialize()); - + trans.HSet(redisCacheKey, data.MemberID, data.Serialize()); + // 分类索引 - trans.SAdd(redisCacheFocusIndexKey, data.UniqueId); + trans.SAdd(redisCacheFocusIndexKey, data.MemberID); // 排序索引使用ZSET - trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.UniqueId); + trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID); //全局索引 - trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.UniqueId); + trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID); var results = trans.Exec(); @@ -122,6 +123,16 @@ namespace JiShe.CollectBus.FreeRedisProvider IEnumerable items, DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel { + if (items == null + || items.Count() <=0 + || string.IsNullOrWhiteSpace(redisCacheKey) + || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) + || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) + || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) + { + throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101"); + } + const int BATCH_SIZE = 1000; // 每批1000条 var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); @@ -144,16 +155,16 @@ namespace JiShe.CollectBus.FreeRedisProvider long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); // 主数据存储Hash - pipe.HSet(redisCacheKey, item.UniqueId, item.Serialize()); + pipe.HSet(redisCacheKey, item.MemberID, item.Serialize()); // 分类索引 - pipe.SAdd(redisCacheFocusIndexKey, item.UniqueId); + pipe.SAdd(redisCacheFocusIndexKey, item.MemberID); // 排序索引使用ZSET - pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.UniqueId); + pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID); //全局索引 - pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.UniqueId); + pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID); } pipe.EndPipe(); } @@ -164,29 +175,108 @@ namespace JiShe.CollectBus.FreeRedisProvider await Task.CompletedTask; } - public async Task UpdateMeterData( + /// + /// 删除指定redis缓存key的缓存数据 + /// + /// + /// 主数据存储Hash缓存Key + /// 集中器索引Set缓存Key + /// 集中器排序索引ZSET缓存Key + /// 集中器采集频率分组全局索引ZSet缓存Key + /// 表计信息 + /// + public async Task RemoveMeterData( string redisCacheKey, - string oldCategoryIndexKey, - string newCategoryIndexKey, - string memberId, // 唯一标识(格式:"分类ID:GUID") - T newData, - int? newCategoryId = null, - DateTimeOffset? newTimestamp = null) + string redisCacheFocusIndexKey, + string redisCacheScoresIndexKey, + string redisCacheGlobalIndexKey, + T data) where T : DeviceCacheBasicModel { - // 参数校验 - if (string.IsNullOrWhiteSpace(memberId)) - throw new ArgumentException("Invalid member ID"); + + if (data == null + || string.IsNullOrWhiteSpace(redisCacheKey) + || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) + || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) + || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) + { + throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101"); + } + + const string luaScript = @" + local mainKey = KEYS[1] + local focusIndexKey = KEYS[2] + local scoresIndexKey = KEYS[3] + local globalIndexKey = KEYS[4] + local member = ARGV[1] + + local deleted = 0 + if redis.call('HDEL', mainKey, member) > 0 then + deleted = 1 + end + + redis.call('SREM', focusIndexKey, member) + redis.call('ZREM', scoresIndexKey, member) + redis.call('ZREM', globalIndexKey, member) + return deleted + "; + + var keys = new[] + { + redisCacheKey, + redisCacheFocusIndexKey, + redisCacheScoresIndexKey, + redisCacheGlobalIndexKey + }; + + var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID }); + + if ((int)result == 0) + throw new KeyNotFoundException("指定数据不存在"); + } + + /// + /// 修改表计缓存信息 + /// + /// + /// 主数据存储Hash缓存Key + /// 旧集中器索引Set缓存Key + /// 新集中器索引Set缓存Key + /// 集中器排序索引ZSET缓存Key + /// 集中器采集频率分组全局索引ZSet缓存Key + /// 表计信息 + /// 可选时间戳 + /// + public async Task UpdateMeterData( + string redisCacheKey, + string oldRedisCacheFocusIndexKey, + string newRedisCacheFocusIndexKey, + string redisCacheScoresIndexKey, + string redisCacheGlobalIndexKey, + T newData, + DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel + { + if (newData == null + || string.IsNullOrWhiteSpace(redisCacheKey) + || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey) + || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey) + || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) + || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) + { + throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101"); + } var luaScript = @" local mainKey = KEYS[1] - local scoreKey = KEYS[2] - local oldIndex = KEYS[3] - local newIndex = KEYS[4] + local oldFocusIndexKey = KEYS[2] + local newFocusIndexKey = KEYS[3] + local scoresIndexKey = KEYS[4] + local globalIndexKey = KEYS[5] local member = ARGV[1] local newData = ARGV[2] local newScore = ARGV[3] + local newGlobalScore = ARGV[4] - -- 校验旧数据是否存在 + -- 校验存在性 if redis.call('HEXISTS', mainKey, member) == 0 then return 0 end @@ -194,70 +284,67 @@ namespace JiShe.CollectBus.FreeRedisProvider -- 更新主数据 redis.call('HSET', mainKey, member, newData) - -- 处理分类变更 + -- 处理变更 if newScore ~= '' then -- 删除旧索引 - redis.call('SREM', oldIndex, member) - -- 更新排序分数 - redis.call('ZADD', scoreKey, newScore, member) + redis.call('SREM', oldFocusIndexKey, member) + redis.call('ZREM', scoresIndexKey, member) + -- 添加新索引 - redis.call('SADD', newIndex, member) + redis.call('SADD', newFocusIndexKey, member) + redis.call('ZADD', scoresIndexKey, newScore, member) + end + + -- 更新全局索引 + if newGlobalScore ~= '' then + -- 删除旧索引 + redis.call('ZREM', globalIndexKey, member) + + -- 添加新索引 + redis.call('ZADD', globalIndexKey, newGlobalScore, member) end return 1 "; - // 计算新score(当分类或时间变化时) - long? newScoreValue = null; - if (newCategoryId.HasValue || newTimestamp.HasValue) - { - var parts = memberId.Split(':'); - var oldCategoryId = int.Parse(parts[0]); - - var actualCategoryId = newCategoryId ?? oldCategoryId; - var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow; - - newScoreValue = ((long)actualCategoryId << 32) | (uint)actualTimestamp.Ticks; - } + var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow; + var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds(); + var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks; var result = await Instance.EvalAsync(luaScript, new[] { redisCacheKey, - $"{redisCacheKey}_scores", - oldCategoryIndexKey, - newCategoryIndexKey + oldRedisCacheFocusIndexKey, + newRedisCacheFocusIndexKey, + redisCacheScoresIndexKey, + redisCacheGlobalIndexKey }, - new[] + new object[] { - memberId, + newData.MemberID, newData.Serialize(), - newScoreValue?.ToString() ?? "" + newScoreValue.ToString() ?? "", + newGlobalScore.ToString() ?? "" }); - // 如果时间戳变化则更新全局索引 - if (newTimestamp.HasValue) - { - long newGlobalScore = newTimestamp.Value.ToUnixTimeMilliseconds(); - await Instance.ZAddAsync("global_data_all", newGlobalScore, memberId); - } - if ((int)result == 0) - throw new KeyNotFoundException("指定数据不存在"); + { + throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在"); + } } - - - public async Task> GetMeterZSetPagedData( + + public async Task> SingleGetMeterPagedData( string redisCacheKey, - string redisCacheIndexKey, - int categoryId, + string redisCacheScoresIndexKey, + int focusId, int pageSize = 10, int pageIndex = 1, bool descending = true) { // 计算score范围 - long minScore = (long)categoryId << 32; - long maxScore = ((long)categoryId + 1) << 32; + long minScore = (long)focusId << 32; + long maxScore = ((long)focusId + 1) << 32; // 分页参数计算 int start = (pageIndex - 1) * pageSize; @@ -265,13 +352,13 @@ namespace JiShe.CollectBus.FreeRedisProvider // 获取排序后的member列表 var members = descending ? await Instance.ZRevRangeByScoreAsync( - $"{redisCacheKey}_scores", + redisCacheScoresIndexKey, maxScore, minScore, start, pageSize) : await Instance.ZRangeByScoreAsync( - $"{redisCacheKey}_scores", + redisCacheScoresIndexKey, minScore, maxScore, start, @@ -284,7 +371,7 @@ namespace JiShe.CollectBus.FreeRedisProvider // 总数统计优化 var total = await Instance.ZCountAsync( - $"{redisCacheKey}_scores", + redisCacheScoresIndexKey, minScore, maxScore); @@ -298,105 +385,290 @@ namespace JiShe.CollectBus.FreeRedisProvider } - public async Task RemoveMeterZSetData( - string redisCacheKey, - string redisCacheIndexKey, - string uniqueId) // 改为基于唯一标识删除 - { - // 原子操作 - var luaScript = @" - local mainKey = KEYS[1] - local scoreKey = KEYS[2] - local indexKey = KEYS[3] - local member = ARGV[1] - - redis.call('HDEL', mainKey, member) - redis.call('ZREM', scoreKey, member) - redis.call('SREM', indexKey, member) - return 1 - "; - - var keys = new[] - { - redisCacheKey, - $"{redisCacheKey}_scores", - redisCacheIndexKey - }; - - var result = await Instance.EvalAsync(luaScript, - keys, - new[] { uniqueId }); - - if ((int)result != 1) - throw new Exception("删除操作失败"); - } - - public async Task> GetGlobalPagedData( + public async Task> GetFocusPagedData( string redisCacheKey, + string redisCacheScoresIndexKey, + int focusId, int pageSize = 10, long? lastScore = null, string lastMember = null, - bool descending = true) + bool descending = true) where T : DeviceCacheBasicModel { - const string zsetKey = "global_data_all"; - - // 分页参数处理 - var (startScore, excludeMember) = descending - ? (lastScore ?? long.MaxValue, lastMember) - : (lastScore ?? 0, lastMember); + // 计算分数范围 + long minScore = (long)focusId << 32; + long maxScore = ((long)focusId + 1) << 32; // 获取成员列表 - string[] members; - if (descending) + var members = await GetSortedMembers( + redisCacheScoresIndexKey, + minScore, + maxScore, + pageSize, + lastScore, + lastMember, + descending); + + // 批量获取数据 + var dataDict = await Instance.HMGetAsync(redisCacheKey, members.CurrentItems); + + return new BusPagedResult { - members = await Instance.ZRevRangeByScoreAsync( + Items = dataDict, + TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore), + HasNext = members.HasNext, + NextScore = members.NextScore, + NextMember = members.NextMember + }; + } + + private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)> + GetSortedMembers( + string zsetKey, + long minScore, + long maxScore, + int pageSize, + long? lastScore, + string lastMember, + bool descending) + { + var querySize = pageSize + 1; + var (startScore, exclude) = descending + ? (lastScore ?? maxScore, lastMember) + : (lastScore ?? minScore, lastMember); + + var members = descending + ? await Instance.ZRevRangeByScoreAsync( zsetKey, max: startScore, - min: 0, + min: minScore, offset: 0, - count: pageSize + 1); - } - else - { - members = await Instance.ZRangeByScoreAsync( + count: querySize) + : await Instance.ZRangeByScoreAsync( zsetKey, min: startScore, - max: long.MaxValue, + max: maxScore, offset: 0, - count: pageSize + 1); + count: querySize); + + var hasNext = members.Length > pageSize; + var currentItems = members.Take(pageSize).ToArray(); + + var nextCursor = currentItems.Any() + ? await GetNextCursor(zsetKey, currentItems.Last(), descending) + : (null, null); + + return (currentItems, hasNext, nextCursor.score, nextCursor.member); + } + + private async Task GetTotalCount(string zsetKey, long min, long max) + { + // 缓存计数优化 + var cacheKey = $"{zsetKey}_count_{min}_{max}"; + var cached = await Instance.GetAsync(cacheKey); + + if (cached.HasValue) + return cached.Value; + + var count = await Instance.ZCountAsync(zsetKey, min, max); + await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒 + return count; + } + + + public async Task>> BatchGetMeterPagedData( + string redisCacheKey, + string redisCacheScoresIndexKey, + IEnumerable focusIds, + int pageSizePerFocus = 10) where T : DeviceCacheBasicModel + { + var results = new ConcurrentDictionary>(); + var parallelOptions = new ParallelOptions + { + MaxDegreeOfParallelism = Environment.ProcessorCount * 2 + }; + + await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) => + { + var data = await SingleGetMeterPagedData( + redisCacheKey, + redisCacheScoresIndexKey, + focusId, + pageSizePerFocus); + + results.TryAdd(focusId, data); + }); + + return new Dictionary>(results); + } + + /// + /// 通过全局索引分页查询表计缓存数据 + /// + /// + /// 主数据存储Hash缓存Key + /// 集中器采集频率分组全局索引ZSet缓存Key + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + public async Task> GetGlobalPagedData( + string redisCacheKey, + string redisCacheGlobalIndexKey, + int pageSize = 10, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel + { + // 参数校验增强 + if (string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) + { + throw new ArgumentException($"{nameof(GetGlobalPagedData)} 参数异常,-101"); } - // 处理分页结果 - bool hasNext = members.Length > pageSize; - var actualMembers = members.Take(pageSize).ToArray(); + if (pageSize < 1 || pageSize > 1000) + { + throw new ArgumentException($"{nameof(GetGlobalPagedData)} 分页大小应在1-1000之间,-102"); + } - // 批量获取数据(优化版本) - var dataTasks = actualMembers - .Select(m => Instance.HGetAsync(redisCacheKey, m)) - .ToArray(); - await Task.WhenAll(dataTasks); + // 分页参数解析 + var (startScore, excludeMember) = descending + ? (lastScore ?? decimal.MaxValue, lastMember) + : (lastScore ?? 0, lastMember); + + // 游标分页查询 + var (members, hasNext) = await GetPagedMembers( + redisCacheGlobalIndexKey, + pageSize, + startScore, + excludeMember, + descending); + + // 批量获取数据(优化内存分配) + var dataDict = await BatchGetData(redisCacheKey, members); // 获取下一页游标 - (long? nextScore, string nextMember) = actualMembers.Any() - ? await GetNextCursor(zsetKey, actualMembers.Last(), descending) + var nextCursor = members.Any() + ? await GetNextCursor(redisCacheGlobalIndexKey, members.Last(), descending) : (null, null); return new BusCacheGlobalPagedResult { - Items = dataTasks.Select(t => t.Result).ToList(), + Items = members.Select(m => dataDict.TryGetValue(m, out var v) ? v : default) + .Where(x => x != null).ToList(), HasNext = hasNext, - NextScore = nextScore, - NextMember = nextMember + NextScore = nextCursor.score, + NextMember = nextCursor.member }; } - private async Task<(long? score, string member)> GetNextCursor( - string zsetKey, + /// + /// 游标分页查询 + /// + /// + /// 分页数量 + /// 开始索引 + /// 开始唯一标识 + /// 排序方式 + /// + private async Task<(List Members, bool HasNext)> GetPagedMembers( + string redisCacheGlobalIndexKey, + int pageSize, + decimal? startScore, + string excludeMember, + bool descending) + { + const int bufferSize = 50; // 预读缓冲区大小 + + // 使用流式分页(避免OFFSET性能问题) + var members = new List(pageSize + 1); + decimal? currentScore = startScore; + string lastMember = excludeMember; + + while (members.Count < pageSize + 1 && currentScore.HasValue) + { + var querySize = Math.Min(bufferSize, pageSize + 1 - members.Count); + + var batch = descending + ? await Instance.ZRevRangeByScoreAsync( + redisCacheGlobalIndexKey, + max: currentScore.Value, + min: 0, + offset: 0, + count: querySize + ) + : await Instance.ZRangeByScoreAsync( + redisCacheGlobalIndexKey, + min: currentScore.Value, + max: long.MaxValue, + offset: 0, + count: querySize); + + if (!batch.Any()) break; + + members.AddRange(batch); + lastMember = batch.LastOrDefault(); + currentScore = await Instance.ZScoreAsync(redisCacheGlobalIndexKey, lastMember); + } + + return ( + members.Take(pageSize).ToList(), + members.Count > pageSize + ); + } + + /// + /// 批量获取指定分页的数据 + /// + /// + /// + /// + /// + private async Task> BatchGetData( + string hashKey, + IEnumerable members) + where T : DeviceCacheBasicModel + { + const int batchSize = 100; + var result = new Dictionary(); + + foreach (var batch in members.Batch(batchSize)) + { + var batchArray = batch.ToArray(); + var values = await Instance.HMGetAsync(hashKey, batchArray); + + for (int i = 0; i < batchArray.Length; i++) + { + if (EqualityComparer.Default.Equals(values[i], default)) continue; + result[batchArray[i]] = values[i]; + } + } + + return result; + } + + /// + /// 获取下一页游标 + /// + /// 全局索引Key + /// 最后一个唯一标识 + /// 排序方式 + /// + private async Task<(decimal? score, string member)> GetNextCursor( + string redisCacheGlobalIndexKey, string lastMember, bool descending) { - var score = await Instance.ZScoreAsync(zsetKey, lastMember); - return (score.HasValue ? (long)score.Value : null, lastMember); + if (string.IsNullOrWhiteSpace(lastMember)) + { + return (null, null); + } + + var score = await Instance.ZScoreAsync(redisCacheGlobalIndexKey, lastMember); + return score.HasValue + ? (Convert.ToInt64(score.Value), lastMember) + : (null, null); } } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs index 36f9a81..dc0aaa3 100644 --- a/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs +++ b/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs @@ -48,6 +48,66 @@ namespace JiShe.CollectBus.FreeRedisProvider string redisCacheGlobalIndexKey, IEnumerable items, DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel; + + /// + /// 删除指定redis缓存key的缓存数据 + /// + /// + /// 主数据存储Hash缓存Key + /// 集中器索引Set缓存Key + /// 集中器排序索引ZSET缓存Key + /// 集中器采集频率分组全局索引ZSet缓存Key + /// 表计信息 + /// + Task RemoveMeterData( + string redisCacheKey, + string redisCacheFocusIndexKey, + string redisCacheScoresIndexKey, + string redisCacheGlobalIndexKey, + T data) where T : DeviceCacheBasicModel; + + /// + /// 修改表计缓存信息 + /// + /// + /// 主数据存储Hash缓存Key + /// 旧集中器索引Set缓存Key + /// 新集中器索引Set缓存Key + /// 集中器排序索引ZSET缓存Key + /// 集中器采集频率分组全局索引ZSet缓存Key + /// 表计信息 + /// 可选时间戳 + /// + Task UpdateMeterData( + string redisCacheKey, + string oldRedisCacheFocusIndexKey, + string newRedisCacheFocusIndexKey, + string redisCacheScoresIndexKey, + string redisCacheGlobalIndexKey, + T newData, + DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel; + + + + /// + /// 通过全局索引分页查询表计缓存数据 + /// + /// + /// 主数据存储Hash缓存Key + /// 集中器采集频率分组全局索引ZSet缓存Key + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + Task> GetGlobalPagedData( + string redisCacheKey, + string redisCacheGlobalIndexKey, + int pageSize = 10, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel; } }