Compare commits
2 Commits
1a275ec9c3
...
78127f7cea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78127f7cea | ||
|
|
849e0f9ac2 |
@ -12,6 +12,9 @@ using System.Linq;
|
|||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Volo.Abp.DependencyInjection;
|
using Volo.Abp.DependencyInjection;
|
||||||
|
using static FreeSql.Internal.GlobalFilter;
|
||||||
|
using static System.Runtime.InteropServices.JavaScript.JSType;
|
||||||
|
using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.RedisDataCache
|
namespace JiShe.CollectBus.RedisDataCache
|
||||||
{
|
{
|
||||||
@ -382,18 +385,16 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
bool descending = true)
|
bool descending = true)
|
||||||
where T : DeviceCacheBasicModel
|
where T : DeviceCacheBasicModel
|
||||||
{
|
{
|
||||||
// 参数校验(保持不变)
|
// 参数校验
|
||||||
if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
|
||||||
|
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
|
_logger.LogError("参数异常: HashKey或ZSetKey为空");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pageSize < 1 || pageSize > 10000)
|
if (pageSize < 1 || pageSize > 10000)
|
||||||
{
|
throw new ArgumentException("分页大小应在1-10000之间");
|
||||||
_logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
var luaScript = @"
|
var luaScript = @"
|
||||||
local command = ARGV[1]
|
local command = ARGV[1]
|
||||||
@ -403,98 +404,120 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
local last_score = ARGV[5]
|
local last_score = ARGV[5]
|
||||||
local last_member = ARGV[6]
|
local last_member = ARGV[6]
|
||||||
|
|
||||||
-- 处理相同分数下的字典序分页
|
-- 获取原始数据
|
||||||
if last_score ~= '' and last_member ~= '' then
|
|
||||||
if command == 'ZRANGEBYSCORE' then
|
|
||||||
range_start = '(' .. last_score
|
|
||||||
range_end = '(' .. last_score .. ' ' .. last_member
|
|
||||||
else
|
|
||||||
range_start = '(' .. last_score .. ' ' .. last_member
|
|
||||||
range_end = '(' .. last_score
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
-- 执行范围查询
|
|
||||||
local members
|
local members
|
||||||
if command == 'ZRANGEBYSCORE' then
|
if command == 'ZRANGEBYSCORE' then
|
||||||
members = redis.call(command, KEYS[1], range_start, range_end,
|
members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
||||||
'WITHSCORES', 'LIMIT', 0, limit)
|
|
||||||
else
|
else
|
||||||
members = redis.call(command, KEYS[1], range_end, range_start,
|
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
||||||
'WITHSCORES', 'LIMIT', 0, limit)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
-- 提取成员和分数
|
-- 过滤数据
|
||||||
local result_members = {}
|
local filtered_members = {}
|
||||||
local result_scores = {}
|
local count = 0
|
||||||
for i = 1, #members, 2 do
|
for i = 1, #members, 2 do
|
||||||
table.insert(result_members, members[i])
|
local member = members[i]
|
||||||
table.insert(result_scores, members[i+1])
|
local score = members[i+1]
|
||||||
|
local include = true
|
||||||
|
if last_score ~= '' and last_member ~= '' then
|
||||||
|
if command == 'ZRANGEBYSCORE' then
|
||||||
|
-- 升序:score > last_score 或 (score == last_score 且 member > last_member)
|
||||||
|
if score == last_score then
|
||||||
|
include = member > last_member
|
||||||
|
else
|
||||||
|
include = tonumber(score) > tonumber(last_score)
|
||||||
end
|
end
|
||||||
|
else
|
||||||
|
-- 降序:score < last_score 或 (score == last_score 且 member < last_member)
|
||||||
|
if score == last_score then
|
||||||
|
include = member < last_member
|
||||||
|
else
|
||||||
|
include = tonumber(score) < tonumber(last_score)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
if include then
|
||||||
|
table.insert(filtered_members, member)
|
||||||
|
table.insert(filtered_members, score)
|
||||||
|
count = count + 1
|
||||||
|
if count >= limit then
|
||||||
|
break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 提取有效数据
|
||||||
|
local result_members, result_scores = {}, {}
|
||||||
|
for i=1,#filtered_members,2 do
|
||||||
|
table.insert(result_members, filtered_members[i])
|
||||||
|
table.insert(result_scores, filtered_members[i+1])
|
||||||
|
end
|
||||||
|
|
||||||
|
if #result_members == 0 then return {0,{},{},{}} end
|
||||||
|
|
||||||
-- 获取Hash数据
|
-- 获取Hash数据
|
||||||
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
|
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
|
||||||
|
return {#result_members, result_members, result_scores, hash_data}";
|
||||||
|
|
||||||
return {
|
// 调整范围构造逻辑(移除排他符号)
|
||||||
#result_members,
|
|
||||||
result_members,
|
|
||||||
result_scores,
|
|
||||||
hash_data
|
|
||||||
}";
|
|
||||||
|
|
||||||
//正确设置范围参数
|
|
||||||
string rangeStart, rangeEnd;
|
string rangeStart, rangeEnd;
|
||||||
if (descending)
|
if (descending)
|
||||||
{
|
{
|
||||||
rangeStart = lastScore.HasValue ? $"({lastScore}" : "+inf";
|
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
|
||||||
rangeEnd = "-inf"; // 降序时固定为最小值
|
rangeEnd = "-inf";
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
rangeStart = lastScore.HasValue ? $"({lastScore}" : "-inf";
|
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
|
||||||
rangeEnd = "+inf"; // 升序时固定为最大值
|
rangeEnd = "+inf";
|
||||||
}
|
}
|
||||||
|
|
||||||
var result = (object[])await Instance.EvalAsync(
|
var scriptResult = (object[])await Instance.EvalAsync(luaScript,
|
||||||
luaScript,
|
|
||||||
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
|
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
|
||||||
new object[]
|
new object[]
|
||||||
{
|
{
|
||||||
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
|
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
|
||||||
rangeStart,
|
rangeStart,
|
||||||
rangeEnd,
|
rangeEnd,
|
||||||
(pageSize + 1).ToString(), // 多取1条用于判断hasNext
|
(pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页
|
||||||
lastScore?.ToString() ?? "",
|
lastScore?.ToString() ?? "",
|
||||||
lastMember ?? ""
|
lastMember ?? ""
|
||||||
});
|
});
|
||||||
|
|
||||||
if ((long)result[0] == 0)
|
if ((long)scriptResult[0] == 0)
|
||||||
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
||||||
|
|
||||||
// 处理结果集
|
// 处理结果集
|
||||||
var members = ((object[])result[1]).Cast<string>().ToList();
|
var members = ((object[])scriptResult[1]).Cast<string>().ToList();
|
||||||
var scores = ((object[])result[2]).Cast<string>().Select(decimal.Parse).ToList();
|
var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList();
|
||||||
var hashData = ((object[])result[3]).Cast<string>().ToList();
|
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
|
||||||
|
|
||||||
//合并有效数据并处理游标
|
var validItems = members.Select((m, i) =>
|
||||||
var validItems = members.Zip(hashData, (m, h) =>
|
{
|
||||||
!string.IsNullOrWhiteSpace(h) ? BusJsonSerializer.Deserialize<T>(h) : null)
|
try
|
||||||
.Where(x => x != null)
|
{
|
||||||
.Take(pageSize + 1)
|
return !string.IsNullOrEmpty(hashData[i])
|
||||||
.ToList();
|
? BusJsonSerializer.Deserialize<T>(hashData[i])
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError($"反序列化失败: {m} - {ex.Message}");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).Where(x => x != null).ToList();
|
||||||
|
|
||||||
var hasNext = validItems.Count > pageSize;
|
var hasNext = validItems.Count > pageSize;
|
||||||
var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
|
var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
|
||||||
|
|
||||||
// 计算下一页起始点
|
// 修正分页锚点索引
|
||||||
string nextMember = null;
|
|
||||||
decimal? nextScore = null;
|
decimal? nextScore = null;
|
||||||
if (hasNext)
|
string nextMember = null;
|
||||||
|
if (hasNext && actualItems.Any())
|
||||||
{
|
{
|
||||||
// 获取实际返回的最后一条有效数据
|
var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引
|
||||||
var lastValidIndex = actualItems.Count() - 1;
|
nextScore = scores[lastIndex];
|
||||||
nextMember = members[lastValidIndex];
|
nextMember = members[lastIndex];
|
||||||
nextScore = scores[lastValidIndex];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return new BusCacheGlobalPagedResult<T>
|
return new BusCacheGlobalPagedResult<T>
|
||||||
|
|||||||
@ -238,7 +238,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
|
|
||||||
|
var timer1 = Stopwatch.StartNew();
|
||||||
decimal? cursor = null;
|
decimal? cursor = null;
|
||||||
string member = null;
|
string member = null;
|
||||||
bool hasNext;
|
bool hasNext;
|
||||||
@ -258,6 +258,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
hasNext = page.HasNext;
|
hasNext = page.HasNext;
|
||||||
} while (hasNext);
|
} while (hasNext);
|
||||||
|
|
||||||
|
timer1.Stop();
|
||||||
|
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
||||||
|
|
||||||
#else
|
#else
|
||||||
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -89,5 +89,26 @@ namespace JiShe.CollectBus.Common.Extensions
|
|||||||
if (buffer.Count > 0)
|
if (buffer.Count > 0)
|
||||||
yield return buffer;
|
yield return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
|
||||||
|
//{
|
||||||
|
// if (batchSize <= 0)
|
||||||
|
// throw new ArgumentOutOfRangeException(nameof(batchSize));
|
||||||
|
|
||||||
|
// using var enumerator = source.GetEnumerator();
|
||||||
|
// while (enumerator.MoveNext())
|
||||||
|
// {
|
||||||
|
// yield return GetBatch(enumerator, batchSize);
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
|
//private static IEnumerable<T> GetBatch<T>(IEnumerator<T> enumerator, int batchSize)
|
||||||
|
//{
|
||||||
|
// do
|
||||||
|
// {
|
||||||
|
// yield return enumerator.Current;
|
||||||
|
// batchSize--;
|
||||||
|
// } while (batchSize > 0 && enumerator.MoveNext());
|
||||||
|
//}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
using JiShe.CollectBus.Common.Models;
|
using FreeSql.DataAnnotations;
|
||||||
|
using JiShe.CollectBus.Common.Models;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
@ -9,6 +10,18 @@ namespace JiShe.CollectBus.Ammeters
|
|||||||
{
|
{
|
||||||
public class AmmeterInfo: DeviceCacheBasicModel
|
public class AmmeterInfo: DeviceCacheBasicModel
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
|
||||||
|
/// </summary>
|
||||||
|
[Column(IsIgnore = true)]
|
||||||
|
public override string MemberID => $"{FocusId}:{MeterId}";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
|
||||||
|
/// </summary>
|
||||||
|
[Column(IsIgnore = true)]
|
||||||
|
public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 电表名称
|
/// 电表名称
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user