解决Redis大批量数据读取总数不准的问题,并将10万记录读取时间控制在23秒以内。

This commit is contained in:
ChenYi 2025-04-17 08:20:54 +08:00
parent 849e0f9ac2
commit 78127f7cea
3 changed files with 87 additions and 107 deletions

View File

@ -385,7 +385,7 @@ namespace JiShe.CollectBus.RedisDataCache
bool descending = true) bool descending = true)
where T : DeviceCacheBasicModel where T : DeviceCacheBasicModel
{ {
// 参数校验(保持不变) // 参数校验
if (string.IsNullOrWhiteSpace(redisHashCacheKey) || if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{ {
@ -396,7 +396,6 @@ namespace JiShe.CollectBus.RedisDataCache
if (pageSize < 1 || pageSize > 10000) if (pageSize < 1 || pageSize > 10000)
throw new ArgumentException("分页大小应在1-10000之间"); throw new ArgumentException("分页大小应在1-10000之间");
// 更新后的Lua脚本
var luaScript = @" var luaScript = @"
local command = ARGV[1] local command = ARGV[1]
local range_start = ARGV[2] local range_start = ARGV[2]
@ -405,93 +404,74 @@ local limit = tonumber(ARGV[4])
local last_score = ARGV[5] local last_score = ARGV[5]
local last_member = ARGV[6] local last_member = ARGV[6]
-- range_startlast_score且没有last_member时 --
if last_score ~= '' and last_member == '' then
if command == 'ZRANGEBYSCORE' then
range_start = '('..last_score
else
range_start = '('..last_score
end
end
local members local members
if command == 'ZRANGEBYSCORE' then if command == 'ZRANGEBYSCORE' then
members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit) members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
else else
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit) members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
end end
if #members == 0 then return {0,{},{},{}} end --
local filtered_members = {}
local result_members = {} local count = 0
local result_scores = {}
-- last_member
if last_member ~= '' and last_score ~= '' then
for i = 1, #members, 2 do for i = 1, #members, 2 do
local member = members[i] local member = members[i]
local score = members[i+1] local score = members[i+1]
local include = true local include = true
if last_score ~= '' and last_member ~= '' then
if score == last_score then
if command == 'ZRANGEBYSCORE' then if command == 'ZRANGEBYSCORE' then
-- member必须 > last_member -- score > last_score (score == last_score member > last_member)
if member <= last_member then if score == last_score then
include = false include = member > last_member
else
include = tonumber(score) > tonumber(last_score)
end end
else else
-- member必须 < last_member -- score < last_score (score == last_score member < last_member)
if member >= last_member then if score == last_score then
include = false include = member < last_member
else
include = tonumber(score) < tonumber(last_score)
end end
end end
end end
if include then if include then
table.insert(result_members, member) table.insert(filtered_members, member)
table.insert(result_scores, score) table.insert(filtered_members, score)
count = count + 1
if count >= limit then
break
end end
end end
else
for i = 1, #members, 2 do
table.insert(result_members, members[i])
table.insert(result_scores, members[i+1])
end
end end
-- limit条 --
local count = #result_members local result_members, result_scores = {}, {}
if count > limit then for i=1,#filtered_members,2 do
result_members = {unpack(result_members, 1, limit)} table.insert(result_members, filtered_members[i])
result_scores = {unpack(result_scores, 1, limit)} table.insert(result_scores, filtered_members[i+1])
end end
if #result_members == 0 then if #result_members == 0 then return {0,{},{},{}} end
return {0, {}, {}, {}}
end
-- Hash数据 -- Hash数据
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {#result_members, result_members, result_scores, hash_data}"; return {#result_members, result_members, result_scores, hash_data}";
// 修复点根据是否传递lastMember决定rangeStart是否排他 // 调整范围构造逻辑(移除排他符号)
string rangeStart, rangeEnd; string rangeStart, rangeEnd;
if (descending) if (descending)
{ {
rangeStart = lastScore.HasValue rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString())
: "+inf";
rangeEnd = "-inf"; rangeEnd = "-inf";
} }
else else
{ {
rangeStart = lastScore.HasValue rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString())
: "-inf";
rangeEnd = "+inf"; rangeEnd = "+inf";
} }
// 执行Lua脚本保持不变
var scriptResult = (object[])await Instance.EvalAsync(luaScript, var scriptResult = (object[])await Instance.EvalAsync(luaScript,
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
new object[] new object[]
@ -499,21 +479,19 @@ return {#result_members, result_members, result_scores, hash_data}";
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
rangeStart, rangeStart,
rangeEnd, rangeEnd,
(pageSize + 1).ToString(), (pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页
lastScore?.ToString() ?? "", lastScore?.ToString() ?? "",
lastMember ?? "" lastMember ?? ""
}); });
// 处理空结果(保持不变)
if ((long)scriptResult[0] == 0) if ((long)scriptResult[0] == 0)
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() }; return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
// 数据提取(保持不变) // 处理结果集
var members = ((object[])scriptResult[1]).Cast<string>().ToList(); var members = ((object[])scriptResult[1]).Cast<string>().ToList();
var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList(); var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList();
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList(); var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
// 反序列化处理(保持不变)
var validItems = members.Select((m, i) => var validItems = members.Select((m, i) =>
{ {
try try
@ -527,18 +505,17 @@ return {#result_members, result_members, result_scores, hash_data}";
_logger.LogError($"反序列化失败: {m} - {ex.Message}"); _logger.LogError($"反序列化失败: {m} - {ex.Message}");
return null; return null;
} }
}).Where(x => x != null).Take(pageSize + 1).ToList(); }).Where(x => x != null).ToList();
// 分页逻辑(保持不变)
var hasNext = validItems.Count > pageSize; var hasNext = validItems.Count > pageSize;
var actualItems = hasNext ? validItems.Take(pageSize) : validItems; var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
// 计算下一页锚点(保持不变) // 修正分页锚点索引
decimal? nextScore = null; decimal? nextScore = null;
string nextMember = null; string nextMember = null;
if (hasNext && actualItems.Any()) if (hasNext && actualItems.Any())
{ {
var lastIndex = Math.Min(members.Count - 1, pageSize); var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引
nextScore = scores[lastIndex]; nextScore = scores[lastIndex];
nextMember = members[lastIndex]; nextMember = members[lastIndex];
} }

View File

@ -213,50 +213,53 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
#if DEBUG #if DEBUG
var timeDensity = "15"; //var timeDensity = "15";
string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}"; //string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
//获取缓存中的电表信息 ////获取缓存中的电表信息
var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*"; //var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
var tempMeterInfos = await GetMeterRedisCacheListData<AmmeterInfoTemp>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); //var tempMeterInfos = await GetMeterRedisCacheListData<AmmeterInfoTemp>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
//List<string> focusAddressDataLista = new List<string>(); ////List<string> focusAddressDataLista = new List<string>();
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); //List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
foreach (var item in tempMeterInfos) //foreach (var item in tempMeterInfos)
{ //{
var tempData = item.Adapt<AmmeterInfo>(); // var tempData = item.Adapt<AmmeterInfo>();
tempData.FocusId = item.FocusID; // tempData.FocusId = item.FocusID;
tempData.MeterId = item.Id; // tempData.MeterId = item.Id;
meterInfos.Add(tempData); // meterInfos.Add(tempData);
//focusAddressDataLista.Add(item.FocusAddress); // //focusAddressDataLista.Add(item.FocusAddress);
} //}
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//var timeDensity = "15"; var timeDensity = "15";
//var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
//var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var timer1 = Stopwatch.StartNew();
decimal? cursor = null;
string member = null;
bool hasNext;
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
do
{
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
//decimal? cursor = null; meterInfos.AddRange(page.Items);
//string member = null; cursor = page.HasNext ? page.NextScore : null;
//bool hasNext; member = page.HasNext ? page.NextMember : null;
//List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); hasNext = page.HasNext;
//do } while (hasNext);
//{
// var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// meterInfos.AddRange(page.Items); timer1.Stop();
// cursor = page.HasNext ? page.NextScore : null; _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
//} while (hasNext);
#else #else
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);

View File

@ -129,7 +129,7 @@
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus2", "ServerTagName": "JiSheCollectBus3",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30 "NumPartitions": 30
} }