修改代码

This commit is contained in:
ChenYi 2025-04-16 23:51:27 +08:00
parent 1a275ec9c3
commit 849e0f9ac2
5 changed files with 201 additions and 121 deletions

View File

@ -12,6 +12,9 @@ using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using static FreeSql.Internal.GlobalFilter;
using static System.Runtime.InteropServices.JavaScript.JSType;
using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application;
namespace JiShe.CollectBus.RedisDataCache namespace JiShe.CollectBus.RedisDataCache
{ {
@ -383,18 +386,17 @@ namespace JiShe.CollectBus.RedisDataCache
where T : DeviceCacheBasicModel where T : DeviceCacheBasicModel
{ {
// 参数校验(保持不变) // 参数校验(保持不变)
if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{ {
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101"); _logger.LogError("参数异常: HashKey或ZSetKey为空");
return null; return null;
} }
if (pageSize < 1 || pageSize > 10000) if (pageSize < 1 || pageSize > 10000)
{ throw new ArgumentException("分页大小应在1-10000之间");
_logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间-102");
return null;
}
// 更新后的Lua脚本
var luaScript = @" var luaScript = @"
local command = ARGV[1] local command = ARGV[1]
local range_start = ARGV[2] local range_start = ARGV[2]
@ -403,98 +405,142 @@ namespace JiShe.CollectBus.RedisDataCache
local last_score = ARGV[5] local last_score = ARGV[5]
local last_member = ARGV[6] local last_member = ARGV[6]
-- -- range_startlast_score且没有last_member时
if last_score ~= '' and last_member ~= '' then if last_score ~= '' and last_member == '' then
if command == 'ZRANGEBYSCORE' then if command == 'ZRANGEBYSCORE' then
range_start = '('..last_score range_start = '('..last_score
range_end = '(' .. last_score .. ' ' .. last_member
else else
range_start = '(' .. last_score .. ' ' .. last_member range_start = '('..last_score
range_end = '(' .. last_score
end end
end end
--
local members local members
if command == 'ZRANGEBYSCORE' then if command == 'ZRANGEBYSCORE' then
members = redis.call(command, KEYS[1], range_start, range_end, members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit)
'WITHSCORES', 'LIMIT', 0, limit)
else else
members = redis.call(command, KEYS[1], range_end, range_start, members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit)
'WITHSCORES', 'LIMIT', 0, limit)
end end
-- if #members == 0 then return {0,{},{},{}} end
local result_members = {} local result_members = {}
local result_scores = {} local result_scores = {}
-- last_member
if last_member ~= '' and last_score ~= '' then
for i = 1, #members, 2 do
local member = members[i]
local score = members[i+1]
local include = true
if score == last_score then
if command == 'ZRANGEBYSCORE' then
-- member必须 > last_member
if member <= last_member then
include = false
end
else
-- member必须 < last_member
if member >= last_member then
include = false
end
end
end
if include then
table.insert(result_members, member)
table.insert(result_scores, score)
end
end
else
for i = 1, #members, 2 do for i = 1, #members, 2 do
table.insert(result_members, members[i]) table.insert(result_members, members[i])
table.insert(result_scores, members[i+1]) table.insert(result_scores, members[i+1])
end end
end
-- limit条
local count = #result_members
if count > limit then
result_members = {unpack(result_members, 1, limit)}
result_scores = {unpack(result_scores, 1, limit)}
end
if #result_members == 0 then
return {0, {}, {}, {}}
end
-- Hash数据 -- Hash数据
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {#result_members, result_members, result_scores, hash_data}";
return { // 修复点根据是否传递lastMember决定rangeStart是否排他
#result_members,
result_members,
result_scores,
hash_data
}";
//正确设置范围参数
string rangeStart, rangeEnd; string rangeStart, rangeEnd;
if (descending) if (descending)
{ {
rangeStart = lastScore.HasValue ? $"({lastScore}" : "+inf"; rangeStart = lastScore.HasValue
rangeEnd = "-inf"; // 降序时固定为最小值 ? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString())
: "+inf";
rangeEnd = "-inf";
} }
else else
{ {
rangeStart = lastScore.HasValue ? $"({lastScore}" : "-inf"; rangeStart = lastScore.HasValue
rangeEnd = "+inf"; // 升序时固定为最大值 ? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString())
: "-inf";
rangeEnd = "+inf";
} }
var result = (object[])await Instance.EvalAsync( // 执行Lua脚本保持不变
luaScript, var scriptResult = (object[])await Instance.EvalAsync(luaScript,
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
new object[] new object[]
{ {
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
rangeStart, rangeStart,
rangeEnd, rangeEnd,
(pageSize + 1).ToString(), // 多取1条用于判断hasNext (pageSize + 1).ToString(),
lastScore?.ToString() ?? "", lastScore?.ToString() ?? "",
lastMember ?? "" lastMember ?? ""
}); });
if ((long)result[0] == 0) // 处理空结果(保持不变)
if ((long)scriptResult[0] == 0)
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() }; return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
// 处理结果集 // 数据提取(保持不变)
var members = ((object[])result[1]).Cast<string>().ToList(); var members = ((object[])scriptResult[1]).Cast<string>().ToList();
var scores = ((object[])result[2]).Cast<string>().Select(decimal.Parse).ToList(); var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList();
var hashData = ((object[])result[3]).Cast<string>().ToList(); var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
//合并有效数据并处理游标 // 反序列化处理(保持不变)
var validItems = members.Zip(hashData, (m, h) => var validItems = members.Select((m, i) =>
!string.IsNullOrWhiteSpace(h) ? BusJsonSerializer.Deserialize<T>(h) : null) {
.Where(x => x != null) try
.Take(pageSize + 1) {
.ToList(); return !string.IsNullOrEmpty(hashData[i])
? BusJsonSerializer.Deserialize<T>(hashData[i])
: null;
}
catch (Exception ex)
{
_logger.LogError($"反序列化失败: {m} - {ex.Message}");
return null;
}
}).Where(x => x != null).Take(pageSize + 1).ToList();
// 分页逻辑(保持不变)
var hasNext = validItems.Count > pageSize; var hasNext = validItems.Count > pageSize;
var actualItems = hasNext ? validItems.Take(pageSize) : validItems; var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
// 计算下一页起始点 // 计算下一页锚点(保持不变)
string nextMember = null;
decimal? nextScore = null; decimal? nextScore = null;
if (hasNext) string nextMember = null;
if (hasNext && actualItems.Any())
{ {
// 获取实际返回的最后一条有效数据 var lastIndex = Math.Min(members.Count - 1, pageSize);
var lastValidIndex = actualItems.Count() - 1; nextScore = scores[lastIndex];
nextMember = members[lastValidIndex]; nextMember = members[lastIndex];
nextScore = scores[lastValidIndex];
} }
return new BusCacheGlobalPagedResult<T> return new BusCacheGlobalPagedResult<T>

View File

@ -213,50 +213,50 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
#if DEBUG #if DEBUG
//var timeDensity = "15"; var timeDensity = "15";
//string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}"; string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
////获取缓存中的电表信息 //获取缓存中的电表信息
//var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*"; var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
//var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//var tempMeterInfos = await GetMeterRedisCacheListData<AmmeterInfoTemp>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); var tempMeterInfos = await GetMeterRedisCacheListData<AmmeterInfoTemp>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
////List<string> focusAddressDataLista = new List<string>(); //List<string> focusAddressDataLista = new List<string>();
//List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
//foreach (var item in tempMeterInfos) foreach (var item in tempMeterInfos)
//{ {
// var tempData = item.Adapt<AmmeterInfo>(); var tempData = item.Adapt<AmmeterInfo>();
// tempData.FocusId = item.FocusID; tempData.FocusId = item.FocusID;
// tempData.MeterId = item.Id; tempData.MeterId = item.Id;
// meterInfos.Add(tempData); meterInfos.Add(tempData);
// //focusAddressDataLista.Add(item.FocusAddress); //focusAddressDataLista.Add(item.FocusAddress);
//} }
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
var timeDensity = "15"; //var timeDensity = "15";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
decimal? cursor = null; //decimal? cursor = null;
string member = null; //string member = null;
bool hasNext; //bool hasNext;
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); //List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
do //do
{ //{
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>( // var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp, // redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp, // redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000, // pageSize: 1000,
lastScore: cursor, // lastScore: cursor,
lastMember: member); // lastMember: member);
meterInfos.AddRange(page.Items); // meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null; // cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null; // member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext; // hasNext = page.HasNext;
} while (hasNext); //} while (hasNext);
#else #else
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);

View File

@ -89,5 +89,26 @@ namespace JiShe.CollectBus.Common.Extensions
if (buffer.Count > 0) if (buffer.Count > 0)
yield return buffer; yield return buffer;
} }
//public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
//{
// if (batchSize <= 0)
// throw new ArgumentOutOfRangeException(nameof(batchSize));
// using var enumerator = source.GetEnumerator();
// while (enumerator.MoveNext())
// {
// yield return GetBatch(enumerator, batchSize);
// }
//}
//private static IEnumerable<T> GetBatch<T>(IEnumerator<T> enumerator, int batchSize)
//{
// do
// {
// yield return enumerator.Current;
// batchSize--;
// } while (batchSize > 0 && enumerator.MoveNext());
//}
} }
} }

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common.Models; using FreeSql.DataAnnotations;
using JiShe.CollectBus.Common.Models;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -9,6 +10,18 @@ namespace JiShe.CollectBus.Ammeters
{ {
public class AmmeterInfo: DeviceCacheBasicModel public class AmmeterInfo: DeviceCacheBasicModel
{ {
/// <summary>
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary>
[Column(IsIgnore = true)]
public override string MemberID => $"{FocusId}:{MeterId}";
/// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
/// </summary>
[Column(IsIgnore = true)]
public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
/// <summary> /// <summary>
/// 电表名称 /// 电表名称
/// </summary> /// </summary>

View File

@ -129,7 +129,7 @@
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus3", "ServerTagName": "JiSheCollectBus2",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30 "NumPartitions": 30
} }