From 849e0f9ac21c9531aae6a6d175c9fefdbcec71a2 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Wed, 16 Apr 2025 23:51:27 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RedisDataCache/RedisDataCacheService.cs | 208 +++++++++++------- .../BasicScheduledMeterReadingService.cs | 74 +++---- .../Extensions/EnumerableExtensions.cs | 21 ++ .../Ammeters/AmmeterInfo.cs | 17 +- src/JiShe.CollectBus.Host/appsettings.json | 2 +- 5 files changed, 201 insertions(+), 121 deletions(-) diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index 4d78706..8f50017 100644 --- a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -12,6 +12,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using Volo.Abp.DependencyInjection; +using static FreeSql.Internal.GlobalFilter; +using static System.Runtime.InteropServices.JavaScript.JSType; +using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application; namespace JiShe.CollectBus.RedisDataCache { @@ -118,7 +121,7 @@ namespace JiShe.CollectBus.RedisDataCache await semaphore.WaitAsync(); _ = Task.Run(() => - { + { using (var pipe = Instance.StartPipe()) { foreach (var item in batch) @@ -372,129 +375,172 @@ namespace JiShe.CollectBus.RedisDataCache /// 最后一个索引 /// 最后一个唯一标识 /// 排序方式 - /// + /// public async Task> GetAllPagedData( - string redisHashCacheKey, - string redisZSetScoresIndexCacheKey, - int pageSize = 1000, - decimal? lastScore = null, - string lastMember = null, - bool descending = true) - where T : DeviceCacheBasicModel + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + int pageSize = 1000, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel { // 参数校验(保持不变) - if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + if (string.IsNullOrWhiteSpace(redisHashCacheKey) || + string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) { - _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101"); + _logger.LogError("参数异常: HashKey或ZSetKey为空"); return null; } if (pageSize < 1 || pageSize > 10000) - { - _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102"); - return null; - } + throw new ArgumentException("分页大小应在1-10000之间"); + // 更新后的Lua脚本 var luaScript = @" - local command = ARGV[1] - local range_start = ARGV[2] - local range_end = ARGV[3] - local limit = tonumber(ARGV[4]) - local last_score = ARGV[5] - local last_member = ARGV[6] +local command = ARGV[1] +local range_start = ARGV[2] +local range_end = ARGV[3] +local limit = tonumber(ARGV[4]) +local last_score = ARGV[5] +local last_member = ARGV[6] - -- 处理相同分数下的字典序分页 - if last_score ~= '' and last_member ~= '' then +-- 调整range_start,当有last_score且没有last_member时 +if last_score ~= '' and last_member == '' then + if command == 'ZRANGEBYSCORE' then + range_start = '('..last_score + else + range_start = '('..last_score + end +end + +local members +if command == 'ZRANGEBYSCORE' then + members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit) +else + members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit) +end + +if #members == 0 then return {0,{},{},{}} end + +local result_members = {} +local result_scores = {} + +-- 如果有last_member,进行过滤 +if last_member ~= '' and last_score ~= '' then + for i = 1, #members, 2 do + local member = members[i] + local score = members[i+1] + local include = true + + if score == last_score then if command == 'ZRANGEBYSCORE' then - range_start = '(' .. last_score - range_end = '(' .. last_score .. ' ' .. last_member + -- 升序:member必须 > last_member + if member <= last_member then + include = false + end else - range_start = '(' .. last_score .. ' ' .. last_member - range_end = '(' .. last_score + -- 降序:member必须 < last_member + if member >= last_member then + include = false + end end end - -- 执行范围查询 - local members - if command == 'ZRANGEBYSCORE' then - members = redis.call(command, KEYS[1], range_start, range_end, - 'WITHSCORES', 'LIMIT', 0, limit) - else - members = redis.call(command, KEYS[1], range_end, range_start, - 'WITHSCORES', 'LIMIT', 0, limit) + if include then + table.insert(result_members, member) + table.insert(result_scores, score) end + end +else + for i = 1, #members, 2 do + table.insert(result_members, members[i]) + table.insert(result_scores, members[i+1]) + end +end - -- 提取成员和分数 - local result_members = {} - local result_scores = {} - for i = 1, #members, 2 do - table.insert(result_members, members[i]) - table.insert(result_scores, members[i+1]) - end +-- 截取前limit条 +local count = #result_members +if count > limit then + result_members = {unpack(result_members, 1, limit)} + result_scores = {unpack(result_scores, 1, limit)} +end - -- 获取Hash数据 - local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) +if #result_members == 0 then + return {0, {}, {}, {}} +end - return { - #result_members, - result_members, - result_scores, - hash_data - }"; +-- 获取Hash数据 +local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) +return {#result_members, result_members, result_scores, hash_data}"; - //正确设置范围参数 + // 修复点:根据是否传递lastMember决定rangeStart是否排他 string rangeStart, rangeEnd; if (descending) { - rangeStart = lastScore.HasValue ? $"({lastScore}" : "+inf"; - rangeEnd = "-inf"; // 降序时固定为最小值 + rangeStart = lastScore.HasValue + ? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString()) + : "+inf"; + rangeEnd = "-inf"; } else { - rangeStart = lastScore.HasValue ? $"({lastScore}" : "-inf"; - rangeEnd = "+inf"; // 升序时固定为最大值 + rangeStart = lastScore.HasValue + ? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString()) + : "-inf"; + rangeEnd = "+inf"; } - var result = (object[])await Instance.EvalAsync( - luaScript, + // 执行Lua脚本(保持不变) + var scriptResult = (object[])await Instance.EvalAsync(luaScript, new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, new object[] { - descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", - rangeStart, - rangeEnd, - (pageSize + 1).ToString(), // 多取1条用于判断hasNext - lastScore?.ToString() ?? "", - lastMember ?? "" + descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", + rangeStart, + rangeEnd, + (pageSize + 1).ToString(), + lastScore?.ToString() ?? "", + lastMember ?? "" }); - if ((long)result[0] == 0) + // 处理空结果(保持不变) + if ((long)scriptResult[0] == 0) return new BusCacheGlobalPagedResult { Items = new List() }; - // 处理结果集 - var members = ((object[])result[1]).Cast().ToList(); - var scores = ((object[])result[2]).Cast().Select(decimal.Parse).ToList(); - var hashData = ((object[])result[3]).Cast().ToList(); + // 数据提取(保持不变) + var members = ((object[])scriptResult[1]).Cast().ToList(); + var scores = ((object[])scriptResult[2]).Cast().Select(decimal.Parse).ToList(); + var hashData = ((object[])scriptResult[3]).Cast().ToList(); - //合并有效数据并处理游标 - var validItems = members.Zip(hashData, (m, h) => - !string.IsNullOrWhiteSpace(h) ? BusJsonSerializer.Deserialize(h) : null) - .Where(x => x != null) - .Take(pageSize + 1) - .ToList(); + // 反序列化处理(保持不变) + var validItems = members.Select((m, i) => + { + try + { + return !string.IsNullOrEmpty(hashData[i]) + ? BusJsonSerializer.Deserialize(hashData[i]) + : null; + } + catch (Exception ex) + { + _logger.LogError($"反序列化失败: {m} - {ex.Message}"); + return null; + } + }).Where(x => x != null).Take(pageSize + 1).ToList(); + // 分页逻辑(保持不变) var hasNext = validItems.Count > pageSize; var actualItems = hasNext ? validItems.Take(pageSize) : validItems; - // 计算下一页起始点 - string nextMember = null; + // 计算下一页锚点(保持不变) decimal? nextScore = null; - if (hasNext) + string nextMember = null; + if (hasNext && actualItems.Any()) { - // 获取实际返回的最后一条有效数据 - var lastValidIndex = actualItems.Count() - 1; - nextMember = members[lastValidIndex]; - nextScore = scores[lastValidIndex]; + var lastIndex = Math.Min(members.Count - 1, pageSize); + nextScore = scores[lastIndex]; + nextMember = members[lastIndex]; } return new BusCacheGlobalPagedResult diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index ce4635b..4f8ae1f 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -213,50 +213,50 @@ namespace JiShe.CollectBus.ScheduledMeterReading public virtual async Task InitAmmeterCacheData(string gatherCode = "") { #if DEBUG - //var timeDensity = "15"; - //string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}"; - ////获取缓存中的电表信息 - //var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*"; + var timeDensity = "15"; + string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}"; + //获取缓存中的电表信息 + var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*"; - //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - //var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); - ////List focusAddressDataLista = new List(); - //List meterInfos = new List(); - //foreach (var item in tempMeterInfos) - //{ - // var tempData = item.Adapt(); - // tempData.FocusId = item.FocusID; - // tempData.MeterId = item.Id; - // meterInfos.Add(tempData); - // //focusAddressDataLista.Add(item.FocusAddress); - //} + var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); + //List focusAddressDataLista = new List(); + List meterInfos = new List(); + foreach (var item in tempMeterInfos) + { + var tempData = item.Adapt(); + tempData.FocusId = item.FocusID; + tempData.MeterId = item.Id; + meterInfos.Add(tempData); + //focusAddressDataLista.Add(item.FocusAddress); + } //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); - var timeDensity = "15"; - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + //var timeDensity = "15"; + //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - decimal? cursor = null; - string member = null; - bool hasNext; - List meterInfos = new List(); - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + //decimal? cursor = null; + //string member = null; + //bool hasNext; + //List meterInfos = new List(); + //do + //{ + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + //} while (hasNext); #else var meterInfos = await GetAmmeterInfoList(gatherCode); diff --git a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs index 80d2f23..b17e9c2 100644 --- a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs @@ -89,5 +89,26 @@ namespace JiShe.CollectBus.Common.Extensions if (buffer.Count > 0) yield return buffer; } + + //public static IEnumerable> Batch(this IEnumerable source, int batchSize) + //{ + // if (batchSize <= 0) + // throw new ArgumentOutOfRangeException(nameof(batchSize)); + + // using var enumerator = source.GetEnumerator(); + // while (enumerator.MoveNext()) + // { + // yield return GetBatch(enumerator, batchSize); + // } + //} + + //private static IEnumerable GetBatch(IEnumerator enumerator, int batchSize) + //{ + // do + // { + // yield return enumerator.Current; + // batchSize--; + // } while (batchSize > 0 && enumerator.MoveNext()); + //} } } diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs index 4ac667d..3df01b5 100644 --- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs @@ -1,4 +1,5 @@ -using JiShe.CollectBus.Common.Models; +using FreeSql.DataAnnotations; +using JiShe.CollectBus.Common.Models; using System; using System.Collections.Generic; using System.Linq; @@ -8,7 +9,19 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Ammeters { public class AmmeterInfo: DeviceCacheBasicModel - { + { + /// + /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 + /// + [Column(IsIgnore = true)] + public override string MemberID => $"{FocusId}:{MeterId}"; + + /// + /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 + /// + [Column(IsIgnore = true)] + public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks; + /// /// 电表名称 /// diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index bb09204..ae2025b 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -129,7 +129,7 @@ "OpenDebugMode": true, "UseTableSessionPoolByDefault": false }, - "ServerTagName": "JiSheCollectBus3", + "ServerTagName": "JiSheCollectBus2", "KafkaReplicationFactor": 3, "NumPartitions": 30 } \ No newline at end of file