Compare commits

..

No commits in common. "78127f7cea9975aa131ba007cf6b3d46ef6b9494" and "1a275ec9c3ca87effa7ad64d0c640e5428f396eb" have entirely different histories.

4 changed files with 87 additions and 147 deletions

View File

@ -12,9 +12,6 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using static FreeSql.Internal.GlobalFilter;
using static System.Runtime.InteropServices.JavaScript.JSType;
using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application;
namespace JiShe.CollectBus.RedisDataCache namespace JiShe.CollectBus.RedisDataCache
{ {
@ -377,147 +374,127 @@ namespace JiShe.CollectBus.RedisDataCache
/// <param name="descending">排序方式</param> /// <param name="descending">排序方式</param>
/// <returns></returns> /// <returns></returns>
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedData<T>( public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedData<T>(
string redisHashCacheKey, string redisHashCacheKey,
string redisZSetScoresIndexCacheKey, string redisZSetScoresIndexCacheKey,
int pageSize = 1000, int pageSize = 1000,
decimal? lastScore = null, decimal? lastScore = null,
string lastMember = null, string lastMember = null,
bool descending = true) bool descending = true)
where T : DeviceCacheBasicModel where T : DeviceCacheBasicModel
{ {
// 参数校验 // 参数校验(保持不变)
if (string.IsNullOrWhiteSpace(redisHashCacheKey) || if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{ {
_logger.LogError("参数异常: HashKey或ZSetKey为空"); _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
return null; return null;
} }
if (pageSize < 1 || pageSize > 10000) if (pageSize < 1 || pageSize > 10000)
throw new ArgumentException("分页大小应在1-10000之间"); {
_logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间-102");
return null;
}
var luaScript = @" var luaScript = @"
local command = ARGV[1] local command = ARGV[1]
local range_start = ARGV[2] local range_start = ARGV[2]
local range_end = ARGV[3] local range_end = ARGV[3]
local limit = tonumber(ARGV[4]) local limit = tonumber(ARGV[4])
local last_score = ARGV[5] local last_score = ARGV[5]
local last_member = ARGV[6] local last_member = ARGV[6]
-- --
local members if last_score ~= '' and last_member ~= '' then
if command == 'ZRANGEBYSCORE' then if command == 'ZRANGEBYSCORE' then
members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2) range_start = '(' .. last_score
else range_end = '(' .. last_score .. ' ' .. last_member
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2) else
end range_start = '(' .. last_score .. ' ' .. last_member
range_end = '(' .. last_score
end
end
-- --
local filtered_members = {} local members
local count = 0
for i = 1, #members, 2 do
local member = members[i]
local score = members[i+1]
local include = true
if last_score ~= '' and last_member ~= '' then
if command == 'ZRANGEBYSCORE' then if command == 'ZRANGEBYSCORE' then
-- score > last_score (score == last_score member > last_member) members = redis.call(command, KEYS[1], range_start, range_end,
if score == last_score then 'WITHSCORES', 'LIMIT', 0, limit)
include = member > last_member
else
include = tonumber(score) > tonumber(last_score)
end
else else
-- score < last_score (score == last_score member < last_member) members = redis.call(command, KEYS[1], range_end, range_start,
if score == last_score then 'WITHSCORES', 'LIMIT', 0, limit)
include = member < last_member
else
include = tonumber(score) < tonumber(last_score)
end
end end
end
if include then --
table.insert(filtered_members, member) local result_members = {}
table.insert(filtered_members, score) local result_scores = {}
count = count + 1 for i = 1, #members, 2 do
if count >= limit then table.insert(result_members, members[i])
break table.insert(result_scores, members[i+1])
end end
end
end
-- -- Hash数据
local result_members, result_scores = {}, {} local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
for i=1,#filtered_members,2 do
table.insert(result_members, filtered_members[i])
table.insert(result_scores, filtered_members[i+1])
end
if #result_members == 0 then return {0,{},{},{}} end return {
#result_members,
result_members,
result_scores,
hash_data
}";
-- Hash数据 //正确设置范围参数
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {#result_members, result_members, result_scores, hash_data}";
// 调整范围构造逻辑(移除排他符号)
string rangeStart, rangeEnd; string rangeStart, rangeEnd;
if (descending) if (descending)
{ {
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf"; rangeStart = lastScore.HasValue ? $"({lastScore}" : "+inf";
rangeEnd = "-inf"; rangeEnd = "-inf"; // 降序时固定为最小值
} }
else else
{ {
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf"; rangeStart = lastScore.HasValue ? $"({lastScore}" : "-inf";
rangeEnd = "+inf"; rangeEnd = "+inf"; // 升序时固定为最大值
} }
var scriptResult = (object[])await Instance.EvalAsync(luaScript, var result = (object[])await Instance.EvalAsync(
luaScript,
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
new object[] new object[]
{ {
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
rangeStart, rangeStart,
rangeEnd, rangeEnd,
(pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页 (pageSize + 1).ToString(), // 多取1条用于判断hasNext
lastScore?.ToString() ?? "", lastScore?.ToString() ?? "",
lastMember ?? "" lastMember ?? ""
}); });
if ((long)scriptResult[0] == 0) if ((long)result[0] == 0)
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() }; return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
// 处理结果集 // 处理结果集
var members = ((object[])scriptResult[1]).Cast<string>().ToList(); var members = ((object[])result[1]).Cast<string>().ToList();
var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList(); var scores = ((object[])result[2]).Cast<string>().Select(decimal.Parse).ToList();
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList(); var hashData = ((object[])result[3]).Cast<string>().ToList();
var validItems = members.Select((m, i) => //合并有效数据并处理游标
{ var validItems = members.Zip(hashData, (m, h) =>
try !string.IsNullOrWhiteSpace(h) ? BusJsonSerializer.Deserialize<T>(h) : null)
{ .Where(x => x != null)
return !string.IsNullOrEmpty(hashData[i]) .Take(pageSize + 1)
? BusJsonSerializer.Deserialize<T>(hashData[i]) .ToList();
: null;
}
catch (Exception ex)
{
_logger.LogError($"反序列化失败: {m} - {ex.Message}");
return null;
}
}).Where(x => x != null).ToList();
var hasNext = validItems.Count > pageSize; var hasNext = validItems.Count > pageSize;
var actualItems = hasNext ? validItems.Take(pageSize) : validItems; var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
// 修正分页锚点索引 // 计算下一页起始点
decimal? nextScore = null;
string nextMember = null; string nextMember = null;
if (hasNext && actualItems.Any()) decimal? nextScore = null;
if (hasNext)
{ {
var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引 // 获取实际返回的最后一条有效数据
nextScore = scores[lastIndex]; var lastValidIndex = actualItems.Count() - 1;
nextMember = members[lastIndex]; nextMember = members[lastValidIndex];
nextScore = scores[lastValidIndex];
} }
return new BusCacheGlobalPagedResult<T> return new BusCacheGlobalPagedResult<T>

View File

@ -238,7 +238,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var timer1 = Stopwatch.StartNew();
decimal? cursor = null; decimal? cursor = null;
string member = null; string member = null;
bool hasNext; bool hasNext;
@ -258,9 +258,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
hasNext = page.HasNext; hasNext = page.HasNext;
} while (hasNext); } while (hasNext);
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
#else #else
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif #endif

View File

@ -89,26 +89,5 @@ namespace JiShe.CollectBus.Common.Extensions
if (buffer.Count > 0) if (buffer.Count > 0)
yield return buffer; yield return buffer;
} }
//public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
//{
// if (batchSize <= 0)
// throw new ArgumentOutOfRangeException(nameof(batchSize));
// using var enumerator = source.GetEnumerator();
// while (enumerator.MoveNext())
// {
// yield return GetBatch(enumerator, batchSize);
// }
//}
//private static IEnumerable<T> GetBatch<T>(IEnumerator<T> enumerator, int batchSize)
//{
// do
// {
// yield return enumerator.Current;
// batchSize--;
// } while (batchSize > 0 && enumerator.MoveNext());
//}
} }
} }

View File

@ -1,5 +1,4 @@
using FreeSql.DataAnnotations; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Common.Models;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -10,18 +9,6 @@ namespace JiShe.CollectBus.Ammeters
{ {
public class AmmeterInfo: DeviceCacheBasicModel public class AmmeterInfo: DeviceCacheBasicModel
{ {
/// <summary>
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary>
[Column(IsIgnore = true)]
public override string MemberID => $"{FocusId}:{MeterId}";
/// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
/// </summary>
[Column(IsIgnore = true)]
public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
/// <summary> /// <summary>
/// 电表名称 /// 电表名称
/// </summary> /// </summary>