diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index 8f50017..e846ada 100644 --- a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -385,7 +385,7 @@ namespace JiShe.CollectBus.RedisDataCache bool descending = true) where T : DeviceCacheBasicModel { - // 参数校验(保持不变) + // 参数校验 if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) { @@ -396,7 +396,6 @@ namespace JiShe.CollectBus.RedisDataCache if (pageSize < 1 || pageSize > 10000) throw new ArgumentException("分页大小应在1-10000之间"); - // 更新后的Lua脚本 var luaScript = @" local command = ARGV[1] local range_start = ARGV[2] @@ -405,93 +404,74 @@ local limit = tonumber(ARGV[4]) local last_score = ARGV[5] local last_member = ARGV[6] --- 调整range_start,当有last_score且没有last_member时 -if last_score ~= '' and last_member == '' then - if command == 'ZRANGEBYSCORE' then - range_start = '('..last_score - else - range_start = '('..last_score - end -end - +-- 获取原始数据 local members if command == 'ZRANGEBYSCORE' then - members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit) + members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2) else - members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit) + members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2) end -if #members == 0 then return {0,{},{},{}} end - -local result_members = {} -local result_scores = {} - --- 如果有last_member,进行过滤 -if last_member ~= '' and last_score ~= '' then - for i = 1, #members, 2 do - local member = members[i] - local score = members[i+1] - local include = true - - if score == last_score then - if command == 'ZRANGEBYSCORE' then - -- 升序:member必须 > last_member - if member <= last_member then - include = false - end +-- 过滤数据 +local filtered_members = {} +local count = 0 +for i = 1, #members, 2 do + local member = members[i] + local score = members[i+1] + local include = true + if last_score ~= '' and last_member ~= '' then + if command == 'ZRANGEBYSCORE' then + -- 升序:score > last_score 或 (score == last_score 且 member > last_member) + if score == last_score then + include = member > last_member else - -- 降序:member必须 < last_member - if member >= last_member then - include = false - end + include = tonumber(score) > tonumber(last_score) + end + else + -- 降序:score < last_score 或 (score == last_score 且 member < last_member) + if score == last_score then + include = member < last_member + else + include = tonumber(score) < tonumber(last_score) end end - - if include then - table.insert(result_members, member) - table.insert(result_scores, score) + end + if include then + table.insert(filtered_members, member) + table.insert(filtered_members, score) + count = count + 1 + if count >= limit then + break end end -else - for i = 1, #members, 2 do - table.insert(result_members, members[i]) - table.insert(result_scores, members[i+1]) - end end --- 截取前limit条 -local count = #result_members -if count > limit then - result_members = {unpack(result_members, 1, limit)} - result_scores = {unpack(result_scores, 1, limit)} +-- 提取有效数据 +local result_members, result_scores = {}, {} +for i=1,#filtered_members,2 do + table.insert(result_members, filtered_members[i]) + table.insert(result_scores, filtered_members[i+1]) end -if #result_members == 0 then - return {0, {}, {}, {}} -end +if #result_members == 0 then return {0,{},{},{}} end -- 获取Hash数据 local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) return {#result_members, result_members, result_scores, hash_data}"; - // 修复点:根据是否传递lastMember决定rangeStart是否排他 + // 调整范围构造逻辑(移除排他符号) string rangeStart, rangeEnd; if (descending) { - rangeStart = lastScore.HasValue - ? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString()) - : "+inf"; + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf"; rangeEnd = "-inf"; } else { - rangeStart = lastScore.HasValue - ? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString()) - : "-inf"; + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf"; rangeEnd = "+inf"; } - // 执行Lua脚本(保持不变) var scriptResult = (object[])await Instance.EvalAsync(luaScript, new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, new object[] @@ -499,21 +479,19 @@ return {#result_members, result_members, result_scores, hash_data}"; descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", rangeStart, rangeEnd, - (pageSize + 1).ToString(), + (pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页 lastScore?.ToString() ?? "", lastMember ?? "" }); - // 处理空结果(保持不变) if ((long)scriptResult[0] == 0) return new BusCacheGlobalPagedResult { Items = new List() }; - // 数据提取(保持不变) + // 处理结果集 var members = ((object[])scriptResult[1]).Cast().ToList(); var scores = ((object[])scriptResult[2]).Cast().Select(decimal.Parse).ToList(); var hashData = ((object[])scriptResult[3]).Cast().ToList(); - // 反序列化处理(保持不变) var validItems = members.Select((m, i) => { try @@ -527,18 +505,17 @@ return {#result_members, result_members, result_scores, hash_data}"; _logger.LogError($"反序列化失败: {m} - {ex.Message}"); return null; } - }).Where(x => x != null).Take(pageSize + 1).ToList(); + }).Where(x => x != null).ToList(); - // 分页逻辑(保持不变) var hasNext = validItems.Count > pageSize; var actualItems = hasNext ? validItems.Take(pageSize) : validItems; - // 计算下一页锚点(保持不变) + // 修正分页锚点索引 decimal? nextScore = null; string nextMember = null; if (hasNext && actualItems.Any()) { - var lastIndex = Math.Min(members.Count - 1, pageSize); + var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引 nextScore = scores[lastIndex]; nextMember = members[lastIndex]; } @@ -551,7 +528,7 @@ return {#result_members, result_members, result_scores, hash_data}"; NextMember = nextMember, TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey), PageSize = pageSize, - }; + }; } ///// diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 4f8ae1f..d32cdb2 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -213,50 +213,53 @@ namespace JiShe.CollectBus.ScheduledMeterReading public virtual async Task InitAmmeterCacheData(string gatherCode = "") { #if DEBUG - var timeDensity = "15"; - string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}"; - //获取缓存中的电表信息 - var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*"; + //var timeDensity = "15"; + //string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}"; + ////获取缓存中的电表信息 + //var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*"; - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); - //List focusAddressDataLista = new List(); - List meterInfos = new List(); - foreach (var item in tempMeterInfos) - { - var tempData = item.Adapt(); - tempData.FocusId = item.FocusID; - tempData.MeterId = item.Id; - meterInfos.Add(tempData); - //focusAddressDataLista.Add(item.FocusAddress); - } + //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + //var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); + ////List focusAddressDataLista = new List(); + //List meterInfos = new List(); + //foreach (var item in tempMeterInfos) + //{ + // var tempData = item.Adapt(); + // tempData.FocusId = item.FocusID; + // tempData.MeterId = item.Id; + // meterInfos.Add(tempData); + // //focusAddressDataLista.Add(item.FocusAddress); + //} //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); - //var timeDensity = "15"; - //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + var timeDensity = "15"; + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + var timer1 = Stopwatch.StartNew(); + decimal? cursor = null; + string member = null; + bool hasNext; + List meterInfos = new List(); + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); - //decimal? cursor = null; - //string member = null; - //bool hasNext; - //List meterInfos = new List(); - //do - //{ - // var page = await _redisDataCacheService.GetAllPagedData( - // redisCacheMeterInfoHashKeyTemp, - // redisCacheMeterInfoZSetScoresIndexKeyTemp, - // pageSize: 1000, - // lastScore: cursor, - // lastMember: member); + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + } while (hasNext); - // meterInfos.AddRange(page.Items); - // cursor = page.HasNext ? page.NextScore : null; - // member = page.HasNext ? page.NextMember : null; - // hasNext = page.HasNext; - //} while (hasNext); + timer1.Stop(); + _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); #else var meterInfos = await GetAmmeterInfoList(gatherCode); diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index ae2025b..bb09204 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -129,7 +129,7 @@ "OpenDebugMode": true, "UseTableSessionPoolByDefault": false }, - "ServerTagName": "JiSheCollectBus2", + "ServerTagName": "JiSheCollectBus3", "KafkaReplicationFactor": 3, "NumPartitions": 30 } \ No newline at end of file