2025-04-16 17:36:46 +08:00
|
|
|
|
using Confluent.Kafka;
|
|
|
|
|
|
using FreeRedis;
|
|
|
|
|
|
using JiShe.CollectBus.Application.Contracts;
|
|
|
|
|
|
using JiShe.CollectBus.Common.Extensions;
|
|
|
|
|
|
using JiShe.CollectBus.Common.Helpers;
|
|
|
|
|
|
using JiShe.CollectBus.Common.Models;
|
|
|
|
|
|
using Microsoft.Extensions.Logging;
|
|
|
|
|
|
using System;
|
2025-04-17 09:10:01 +08:00
|
|
|
|
using System.Collections.Concurrent;
|
2025-04-16 17:36:46 +08:00
|
|
|
|
using System.Collections.Generic;
|
|
|
|
|
|
using System.Linq;
|
|
|
|
|
|
using System.Threading;
|
|
|
|
|
|
using System.Threading.Tasks;
|
2025-04-17 20:28:50 +08:00
|
|
|
|
using JiShe.CollectBus.FreeRedis;
|
2025-04-16 17:36:46 +08:00
|
|
|
|
using Volo.Abp.DependencyInjection;
|
2025-04-16 23:51:27 +08:00
|
|
|
|
using static FreeSql.Internal.GlobalFilter;
|
|
|
|
|
|
using static System.Runtime.InteropServices.JavaScript.JSType;
|
|
|
|
|
|
using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application;
|
2025-05-12 23:18:02 +08:00
|
|
|
|
using JiShe.CollectBus.IotSystems.Ammeters;
|
2025-04-16 17:36:46 +08:00
|
|
|
|
|
|
|
|
|
|
namespace JiShe.CollectBus.RedisDataCache
|
|
|
|
|
|
{
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 数据缓存服务接口
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public class RedisDataCacheService : IRedisDataCacheService, ITransientDependency
|
|
|
|
|
|
{
|
|
|
|
|
|
private readonly IFreeRedisProvider _freeRedisProvider;
|
|
|
|
|
|
private readonly ILogger<RedisDataCacheService> _logger;
|
|
|
|
|
|
private RedisClient Instance { get; set; }
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 数据缓存服务接口
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="freeRedisProvider"></param>
|
|
|
|
|
|
/// <param name="logger"></param>
|
|
|
|
|
|
public RedisDataCacheService(IFreeRedisProvider freeRedisProvider,
|
|
|
|
|
|
ILogger<RedisDataCacheService> logger)
|
|
|
|
|
|
{
|
|
|
|
|
|
this._freeRedisProvider = freeRedisProvider;
|
|
|
|
|
|
this._logger = logger;
|
|
|
|
|
|
|
|
|
|
|
|
Instance = _freeRedisProvider.Instance;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 单个添加数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
|
|
|
|
|
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
|
|
|
|
|
|
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
|
|
|
|
|
|
/// <param name="data">待缓存数据</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task InsertDataAsync<T>(
|
|
|
|
|
|
string redisHashCacheKey,
|
|
|
|
|
|
string redisSetIndexCacheKey,
|
|
|
|
|
|
string redisZSetScoresIndexCacheKey,
|
|
|
|
|
|
T data) where T : DeviceCacheBasicModel
|
|
|
|
|
|
{
|
|
|
|
|
|
// 参数校验增强
|
|
|
|
|
|
if (data == null || string.IsNullOrWhiteSpace(redisHashCacheKey)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(InsertDataAsync)} 参数异常,-101");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 使用事务保证原子性
|
|
|
|
|
|
using (var trans = Instance.Multi())
|
|
|
|
|
|
{
|
|
|
|
|
|
// 主数据存储Hash
|
2025-04-17 11:29:26 +08:00
|
|
|
|
trans.HSet(redisHashCacheKey, data.MemberId, data.Serialize());
|
2025-04-16 17:36:46 +08:00
|
|
|
|
|
|
|
|
|
|
// 集中器号分组索引Set缓存
|
2025-04-17 11:29:26 +08:00
|
|
|
|
trans.SAdd(redisSetIndexCacheKey, data.MemberId);
|
2025-04-16 17:36:46 +08:00
|
|
|
|
|
|
|
|
|
|
// 集中器与表计信息排序索引ZSET缓存Key
|
2025-04-17 11:29:26 +08:00
|
|
|
|
trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberId);
|
2025-04-16 17:36:46 +08:00
|
|
|
|
|
|
|
|
|
|
var results = trans.Exec();
|
|
|
|
|
|
|
|
|
|
|
|
if (results == null || results.Length <= 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(InsertDataAsync)} 添加事务提交失败,-102");
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
await Task.CompletedTask;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 批量添加数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
|
2025-05-13 14:51:38 +08:00
|
|
|
|
/// <param name="redisDeviceInfoHashCacheKey">hash缓存Key</param>
|
2025-04-16 17:36:46 +08:00
|
|
|
|
/// <param name="items">待缓存数据集合</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task BatchInsertDataAsync<T>(
|
2025-05-12 23:18:02 +08:00
|
|
|
|
string redisSetIndexCacheKey,
|
|
|
|
|
|
string redisDeviceInfoHashCacheKey,
|
|
|
|
|
|
Dictionary<string, List<T>> items) where T : DeviceCacheBasicModel
|
|
|
|
|
|
{
|
|
|
|
|
|
if (items == null
|
|
|
|
|
|
|| items.Count() <= 0
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey)
|
|
|
|
|
|
)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
const int BATCH_SIZE = 1000; // 每批1000条
|
|
|
|
|
|
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
|
|
|
|
|
|
|
|
|
|
|
|
foreach (var batch in items.Batch(BATCH_SIZE))
|
|
|
|
|
|
{
|
|
|
|
|
|
await semaphore.WaitAsync();
|
|
|
|
|
|
|
|
|
|
|
|
_ = Task.Run(() =>
|
|
|
|
|
|
{
|
|
|
|
|
|
using (var pipe = Instance.StartPipe())
|
|
|
|
|
|
{
|
|
|
|
|
|
foreach (var item in batch)
|
|
|
|
|
|
{
|
|
|
|
|
|
// Set索引缓存
|
|
|
|
|
|
pipe.SAdd(redisSetIndexCacheKey, $"{item.Value.First().TimeDensity.ToString().PadLeft(2, '0')}:{item.Value.First().FocusAddress}");
|
|
|
|
|
|
|
|
|
|
|
|
//设备信息缓存
|
|
|
|
|
|
pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, item.Value.Serialize());
|
|
|
|
|
|
|
2025-04-16 17:36:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
pipe.EndPipe();
|
|
|
|
|
|
}
|
|
|
|
|
|
semaphore.Release();
|
|
|
|
|
|
});
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
await Task.CompletedTask;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 删除缓存信息
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
|
|
|
|
|
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
|
|
|
|
|
|
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
|
|
|
|
|
|
/// <param name="data">已缓存数据</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task RemoveCacheDataAsync<T>(
|
|
|
|
|
|
string redisHashCacheKey,
|
|
|
|
|
|
string redisSetIndexCacheKey,
|
|
|
|
|
|
string redisZSetScoresIndexCacheKey,
|
|
|
|
|
|
T data) where T : DeviceCacheBasicModel
|
|
|
|
|
|
{
|
|
|
|
|
|
if (data == null
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 参数异常,-101");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
const string luaScript = @"
|
|
|
|
|
|
local hashCacheKey = KEYS[1]
|
|
|
|
|
|
local setIndexCacheKey = KEYS[2]
|
|
|
|
|
|
local zsetScoresIndexCacheKey = KEYS[3]
|
|
|
|
|
|
local member = ARGV[1]
|
|
|
|
|
|
|
|
|
|
|
|
local deleted = 0
|
|
|
|
|
|
if redis.call('HDEL', hashCacheKey, member) > 0 then
|
|
|
|
|
|
deleted = 1
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
redis.call('SREM', setIndexCacheKey, member)
|
|
|
|
|
|
redis.call('ZREM', zsetScoresIndexCacheKey, member)
|
|
|
|
|
|
return deleted
|
|
|
|
|
|
";
|
|
|
|
|
|
|
|
|
|
|
|
var keys = new[]
|
|
|
|
|
|
{
|
|
|
|
|
|
redisHashCacheKey,
|
|
|
|
|
|
redisSetIndexCacheKey,
|
|
|
|
|
|
redisZSetScoresIndexCacheKey
|
|
|
|
|
|
};
|
|
|
|
|
|
|
2025-04-17 11:29:26 +08:00
|
|
|
|
var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberId });
|
2025-04-16 17:36:46 +08:00
|
|
|
|
|
|
|
|
|
|
if ((int)result == 0)
|
|
|
|
|
|
{
|
2025-04-17 11:29:26 +08:00
|
|
|
|
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberId}数据失败,-102");
|
2025-04-16 17:36:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 修改缓存信息,映射关系未发生改变
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
|
|
|
|
|
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
|
|
|
|
|
|
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
|
|
|
|
|
|
/// <param name="newData">待修改缓存数据</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task ModifyDataAsync<T>(
|
|
|
|
|
|
string redisHashCacheKey,
|
|
|
|
|
|
string redisSetIndexCacheKey,
|
|
|
|
|
|
string redisZSetScoresIndexCacheKey,
|
|
|
|
|
|
T newData) where T : DeviceCacheBasicModel
|
|
|
|
|
|
{
|
|
|
|
|
|
if (newData == null
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(ModifyDataAsync)} 参数异常,-101");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var luaScript = @"
|
|
|
|
|
|
local hashCacheKey = KEYS[1]
|
|
|
|
|
|
local member = ARGV[1]
|
|
|
|
|
|
local newData = ARGV[2]
|
|
|
|
|
|
|
|
|
|
|
|
-- 校验存在性
|
|
|
|
|
|
if redis.call('HEXISTS', hashCacheKey, member) == 0 then
|
|
|
|
|
|
return 0
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
-- 更新主数据
|
|
|
|
|
|
redis.call('HSET', hashCacheKey, member, newData)
|
|
|
|
|
|
|
|
|
|
|
|
return 1
|
|
|
|
|
|
";
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var result = await Instance.EvalAsync(luaScript,
|
|
|
|
|
|
new[]
|
|
|
|
|
|
{
|
|
|
|
|
|
redisHashCacheKey
|
|
|
|
|
|
},
|
|
|
|
|
|
new object[]
|
|
|
|
|
|
{
|
2025-04-17 11:29:26 +08:00
|
|
|
|
newData.MemberId,
|
2025-04-16 17:36:46 +08:00
|
|
|
|
newData.Serialize()
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
if ((int)result == 0)
|
|
|
|
|
|
{
|
2025-04-17 11:29:26 +08:00
|
|
|
|
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102");
|
2025-04-16 17:36:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 修改缓存信息,映射关系已经改变
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
|
|
|
|
|
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
|
|
|
|
|
|
/// <param name="oldMemberId">旧的映射关系</param>
|
|
|
|
|
|
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
|
|
|
|
|
|
/// <param name="newData">待修改缓存数据</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task ModifyDataAsync<T>(
|
|
|
|
|
|
string redisHashCacheKey,
|
|
|
|
|
|
string redisSetIndexCacheKey,
|
|
|
|
|
|
string oldMemberId,
|
|
|
|
|
|
string redisZSetScoresIndexCacheKey,
|
|
|
|
|
|
T newData) where T : DeviceCacheBasicModel
|
|
|
|
|
|
{
|
|
|
|
|
|
if (newData == null
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(oldMemberId)
|
|
|
|
|
|
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(ModifyDataAsync)} 参数异常,-101");
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var luaScript = @"
|
|
|
|
|
|
local hashCacheKey = KEYS[1]
|
|
|
|
|
|
local setIndexCacheKey = KEYS[2]
|
|
|
|
|
|
local zsetScoresIndexCacheKey = KEYS[3]
|
|
|
|
|
|
local member = ARGV[1]
|
|
|
|
|
|
local oldMember = ARGV[2]
|
|
|
|
|
|
local newData = ARGV[3]
|
|
|
|
|
|
local newScore = ARGV[4]
|
|
|
|
|
|
|
|
|
|
|
|
-- 校验存在性
|
|
|
|
|
|
if redis.call('HEXISTS', hashCacheKey, oldMember) == 0 then
|
|
|
|
|
|
return 0
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
-- 删除旧数据
|
|
|
|
|
|
redis.call('HDEL', hashCacheKey, oldMember)
|
|
|
|
|
|
|
|
|
|
|
|
-- 插入新主数据
|
|
|
|
|
|
redis.call('HSET', hashCacheKey, member, newData)
|
|
|
|
|
|
|
|
|
|
|
|
-- 处理变更
|
|
|
|
|
|
if newScore ~= '' then
|
|
|
|
|
|
-- 删除旧索引
|
|
|
|
|
|
redis.call('SREM', setIndexCacheKey, oldMember)
|
|
|
|
|
|
redis.call('ZREM', zsetScoresIndexCacheKey, oldMember)
|
|
|
|
|
|
|
|
|
|
|
|
-- 添加新索引
|
|
|
|
|
|
redis.call('SADD', setIndexCacheKey, member)
|
|
|
|
|
|
redis.call('ZADD', zsetScoresIndexCacheKey, newScore, member)
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
return 1
|
|
|
|
|
|
";
|
|
|
|
|
|
|
|
|
|
|
|
var result = await Instance.EvalAsync(luaScript,
|
|
|
|
|
|
new[]
|
|
|
|
|
|
{
|
|
|
|
|
|
redisHashCacheKey,
|
|
|
|
|
|
redisSetIndexCacheKey,
|
|
|
|
|
|
redisZSetScoresIndexCacheKey
|
|
|
|
|
|
},
|
|
|
|
|
|
new object[]
|
|
|
|
|
|
{
|
2025-04-17 11:29:26 +08:00
|
|
|
|
newData.MemberId,
|
2025-04-16 17:36:46 +08:00
|
|
|
|
oldMemberId,
|
|
|
|
|
|
newData.Serialize(),
|
|
|
|
|
|
newData.ScoreValue.ToString() ?? "",
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
if ((int)result == 0)
|
|
|
|
|
|
{
|
2025-04-17 11:29:26 +08:00
|
|
|
|
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102");
|
2025-04-16 17:36:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 通过集中器与表计信息排序索引获取指定集中器号集合数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
|
|
|
|
|
|
/// <param name="redisCacheFocusScoresIndexKey">集中器与表计信息排序索引ZSET缓存Key</param>
|
|
|
|
|
|
/// <param name="focusIds">集中器Id</param>
|
|
|
|
|
|
/// <param name="pageSize">分页尺寸</param>
|
|
|
|
|
|
/// <param name="lastScore">最后一个索引</param>
|
|
|
|
|
|
/// <param name="lastMember">最后一个唯一标识</param>
|
|
|
|
|
|
/// <param name="descending">排序方式</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
|
|
|
|
|
|
string redisCacheKey,
|
|
|
|
|
|
string redisCacheFocusScoresIndexKey,
|
|
|
|
|
|
IEnumerable<int> focusIds,
|
|
|
|
|
|
int pageSize = 10,
|
|
|
|
|
|
decimal? lastScore = null,
|
|
|
|
|
|
string lastMember = null,
|
|
|
|
|
|
bool descending = true)
|
|
|
|
|
|
where T : DeviceCacheBasicModel
|
|
|
|
|
|
{
|
|
|
|
|
|
throw new Exception();
|
2025-04-17 11:29:26 +08:00
|
|
|
|
}
|
2025-04-29 23:48:47 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
2025-05-11 21:53:55 +08:00
|
|
|
|
/// 通过集中器与表计信息排序索引获取单个数据
|
2025-04-29 23:48:47 +08:00
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
|
|
|
|
|
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
|
|
|
|
|
|
/// <param name="scoreValueRawData">ZSET索引的原始数据,例如集中地址和点位的组合</param>
|
|
|
|
|
|
/// <param name="pageSize">分页尺寸</param>
|
|
|
|
|
|
/// <param name="lastScore">最后一个索引</param>
|
|
|
|
|
|
/// <param name="lastMember">最后一个唯一标识</param>
|
|
|
|
|
|
/// <param name="descending">排序方式</param>
|
|
|
|
|
|
/// <returns></returns>
|
2025-05-11 21:53:55 +08:00
|
|
|
|
public async Task<BusCacheGlobalPagedResult<T>> GetSingleData<T>(
|
|
|
|
|
|
string redisHashCacheKey,
|
|
|
|
|
|
string redisZSetScoresIndexCacheKey,
|
|
|
|
|
|
string scoreValueRawData,
|
|
|
|
|
|
int pageSize = 10,
|
|
|
|
|
|
decimal? lastScore = null,
|
|
|
|
|
|
string lastMember = null,
|
|
|
|
|
|
bool descending = true)
|
|
|
|
|
|
where T : DeviceCacheBasicModel
|
2025-04-29 23:48:47 +08:00
|
|
|
|
{
|
2025-05-11 21:53:55 +08:00
|
|
|
|
// 参数校验
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
|
|
|
|
|
|
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey) ||
|
|
|
|
|
|
string.IsNullOrWhiteSpace(scoreValueRawData))
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(GetSingleData)} 参数异常,-101");
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
|
|
|
|
}
|
2025-04-29 23:48:47 +08:00
|
|
|
|
|
2025-05-11 21:53:55 +08:00
|
|
|
|
// 解析原始数据
|
|
|
|
|
|
var rawDataArray = scoreValueRawData.Split(':');
|
|
|
|
|
|
if (rawDataArray.Length != 2)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(GetSingleData)} scoreValueRawData格式错误,应为[focusAddress:point]");
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
|
|
|
|
}
|
2025-04-29 23:48:47 +08:00
|
|
|
|
|
2025-05-11 21:53:55 +08:00
|
|
|
|
// 计算Score值
|
|
|
|
|
|
long scoreValue;
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
var focusAddress = rawDataArray[0];
|
|
|
|
|
|
var point = Convert.ToInt32(rawDataArray[1]);
|
|
|
|
|
|
scoreValue = CommonHelper.GetFocusScores(focusAddress, point);
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError(ex, $"{nameof(GetSingleData)} 计算Score值失败");
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
|
|
|
|
}
|
2025-04-29 23:48:47 +08:00
|
|
|
|
|
2025-05-11 21:53:55 +08:00
|
|
|
|
// Lua脚本:原子化查询操作
|
|
|
|
|
|
const string luaScript = @"
|
|
|
|
|
|
local zsetKey = KEYS[1]
|
|
|
|
|
|
local hashKey = KEYS[2]
|
|
|
|
|
|
local targetScore = ARGV[1]
|
|
|
|
|
|
|
|
|
|
|
|
-- 精确匹配Score并获取第一个成员
|
|
|
|
|
|
local members = redis.call('ZRANGEBYSCORE', zsetKey, targetScore, targetScore, 'LIMIT', 0, 1)
|
|
|
|
|
|
if #members == 0 then
|
|
|
|
|
|
return nil
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
-- 获取哈希表数据
|
|
|
|
|
|
local member = members[1]
|
|
|
|
|
|
local data = redis.call('HGET', hashKey, member)
|
|
|
|
|
|
return {member, data}";
|
|
|
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
|
{
|
|
|
|
|
|
// 执行脚本
|
|
|
|
|
|
var result = await Instance.EvalAsync(
|
|
|
|
|
|
luaScript,
|
|
|
|
|
|
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
|
|
|
|
|
|
new object[] { scoreValue });
|
|
|
|
|
|
|
|
|
|
|
|
// 处理空结果
|
|
|
|
|
|
if (result == null)
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
|
|
|
|
|
|
|
|
|
|
// 解析Redis返回数据
|
|
|
|
|
|
var resultArray = (object[])result;
|
|
|
|
|
|
var memberId = (string)resultArray[0];
|
|
|
|
|
|
var dataStr = (string)resultArray[1];
|
|
|
|
|
|
|
|
|
|
|
|
// 反序列化数据
|
|
|
|
|
|
var data = BusJsonSerializer.Deserialize<T>(dataStr);
|
|
|
|
|
|
if (data == null)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(GetSingleData)} 反序列化失败,MemberId: {memberId}");
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 构造返回结果
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T>
|
|
|
|
|
|
{
|
|
|
|
|
|
Items = new List<T> { data },
|
|
|
|
|
|
TotalCount = 1,
|
|
|
|
|
|
PageSize = 1,
|
|
|
|
|
|
HasNext = false,
|
|
|
|
|
|
NextScore = null,
|
|
|
|
|
|
NextMember = null
|
|
|
|
|
|
};
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError(ex, $"{nameof(GetSingleData)} Redis操作异常");
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
|
|
|
|
}
|
2025-04-29 23:48:47 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-16 17:36:46 +08:00
|
|
|
|
/// <summary>
|
2025-04-17 11:29:26 +08:00
|
|
|
|
/// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
|
2025-04-16 17:36:46 +08:00
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
|
|
|
|
|
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
|
|
|
|
|
|
/// <param name="pageSize">分页尺寸</param>
|
|
|
|
|
|
/// <param name="lastScore">最后一个索引</param>
|
|
|
|
|
|
/// <param name="lastMember">最后一个唯一标识</param>
|
|
|
|
|
|
/// <param name="descending">排序方式</param>
|
2025-04-16 23:51:27 +08:00
|
|
|
|
/// <returns></returns>
|
2025-04-16 17:36:46 +08:00
|
|
|
|
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedData<T>(
|
2025-04-16 23:51:27 +08:00
|
|
|
|
string redisHashCacheKey,
|
|
|
|
|
|
string redisZSetScoresIndexCacheKey,
|
|
|
|
|
|
int pageSize = 1000,
|
|
|
|
|
|
decimal? lastScore = null,
|
|
|
|
|
|
string lastMember = null,
|
|
|
|
|
|
bool descending = true)
|
|
|
|
|
|
where T : DeviceCacheBasicModel
|
2025-04-16 17:36:46 +08:00
|
|
|
|
{
|
2025-04-17 09:10:01 +08:00
|
|
|
|
// 参数校验增强
|
2025-04-17 15:49:57 +08:00
|
|
|
|
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
|
|
|
|
|
|
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
2025-04-16 17:36:46 +08:00
|
|
|
|
{
|
2025-04-17 09:10:01 +08:00
|
|
|
|
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
|
2025-04-17 15:49:57 +08:00
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
2025-04-16 17:36:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-17 15:49:57 +08:00
|
|
|
|
pageSize = Math.Clamp(pageSize, 1, 10000);
|
2025-04-16 17:36:46 +08:00
|
|
|
|
|
|
|
|
|
|
var luaScript = @"
|
2025-04-17 09:10:01 +08:00
|
|
|
|
local command = ARGV[1]
|
|
|
|
|
|
local range_start = ARGV[2]
|
|
|
|
|
|
local range_end = ARGV[3]
|
|
|
|
|
|
local limit = tonumber(ARGV[4])
|
|
|
|
|
|
local last_score = ARGV[5]
|
|
|
|
|
|
local last_member = ARGV[6]
|
|
|
|
|
|
|
|
|
|
|
|
-- 获取原始数据
|
|
|
|
|
|
local members
|
|
|
|
|
|
if command == 'ZRANGEBYSCORE' then
|
|
|
|
|
|
members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
|
|
|
|
|
else
|
|
|
|
|
|
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
-- 过滤数据
|
|
|
|
|
|
local filtered_members = {}
|
|
|
|
|
|
local count = 0
|
|
|
|
|
|
for i = 1, #members, 2 do
|
|
|
|
|
|
local member = members[i]
|
|
|
|
|
|
local score = members[i+1]
|
|
|
|
|
|
local include = true
|
|
|
|
|
|
if last_score ~= '' and last_member ~= '' then
|
|
|
|
|
|
if command == 'ZRANGEBYSCORE' then
|
|
|
|
|
|
-- 升序:score > last_score 或 (score == last_score 且 member > last_member)
|
|
|
|
|
|
if score == last_score then
|
|
|
|
|
|
include = member > last_member
|
|
|
|
|
|
else
|
|
|
|
|
|
include = tonumber(score) > tonumber(last_score)
|
|
|
|
|
|
end
|
|
|
|
|
|
else
|
|
|
|
|
|
-- 降序:score < last_score 或 (score == last_score 且 member < last_member)
|
|
|
|
|
|
if score == last_score then
|
|
|
|
|
|
include = member < last_member
|
|
|
|
|
|
else
|
|
|
|
|
|
include = tonumber(score) < tonumber(last_score)
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
if include then
|
|
|
|
|
|
table.insert(filtered_members, member)
|
|
|
|
|
|
table.insert(filtered_members, score)
|
|
|
|
|
|
count = count + 1
|
|
|
|
|
|
if count >= limit then
|
|
|
|
|
|
break
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
-- 提取有效数据
|
|
|
|
|
|
local result_members, result_scores = {}, {}
|
|
|
|
|
|
for i=1,#filtered_members,2 do
|
|
|
|
|
|
table.insert(result_members, filtered_members[i])
|
|
|
|
|
|
table.insert(result_scores, filtered_members[i+1])
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
if #result_members == 0 then return {0,{},{},{}} end
|
|
|
|
|
|
|
|
|
|
|
|
-- 获取Hash数据
|
|
|
|
|
|
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
|
|
|
|
|
|
return {#result_members, result_members, result_scores, hash_data}";
|
2025-04-16 23:51:27 +08:00
|
|
|
|
|
2025-04-17 08:20:54 +08:00
|
|
|
|
// 调整范围构造逻辑(移除排他符号)
|
2025-04-16 17:36:46 +08:00
|
|
|
|
string rangeStart, rangeEnd;
|
|
|
|
|
|
if (descending)
|
|
|
|
|
|
{
|
2025-04-17 08:20:54 +08:00
|
|
|
|
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
|
2025-04-16 23:51:27 +08:00
|
|
|
|
rangeEnd = "-inf";
|
2025-04-16 17:36:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
2025-04-17 08:20:54 +08:00
|
|
|
|
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
|
2025-04-16 23:51:27 +08:00
|
|
|
|
rangeEnd = "+inf";
|
2025-04-16 17:36:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-16 23:51:27 +08:00
|
|
|
|
var scriptResult = (object[])await Instance.EvalAsync(luaScript,
|
2025-04-16 17:36:46 +08:00
|
|
|
|
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
|
|
|
|
|
|
new object[]
|
|
|
|
|
|
{
|
2025-04-16 23:51:27 +08:00
|
|
|
|
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
|
|
|
|
|
|
rangeStart,
|
|
|
|
|
|
rangeEnd,
|
2025-04-17 08:20:54 +08:00
|
|
|
|
(pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页
|
2025-04-16 23:51:27 +08:00
|
|
|
|
lastScore?.ToString() ?? "",
|
|
|
|
|
|
lastMember ?? ""
|
2025-04-16 17:36:46 +08:00
|
|
|
|
});
|
|
|
|
|
|
|
2025-04-16 23:51:27 +08:00
|
|
|
|
if ((long)scriptResult[0] == 0)
|
2025-04-16 17:36:46 +08:00
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
|
|
|
|
|
2025-04-17 08:20:54 +08:00
|
|
|
|
// 处理结果集
|
2025-04-16 23:51:27 +08:00
|
|
|
|
var members = ((object[])scriptResult[1]).Cast<string>().ToList();
|
|
|
|
|
|
var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList();
|
|
|
|
|
|
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
|
2025-04-16 17:36:46 +08:00
|
|
|
|
|
2025-04-17 08:29:32 +08:00
|
|
|
|
var validItems = members.AsParallel()
|
|
|
|
|
|
.Select((m, i) =>
|
2025-04-16 23:51:27 +08:00
|
|
|
|
{
|
2025-04-17 08:29:32 +08:00
|
|
|
|
try { return BusJsonSerializer.Deserialize<T>(hashData[i]); }
|
|
|
|
|
|
catch { return null; }
|
|
|
|
|
|
})
|
|
|
|
|
|
.Where(x => x != null)
|
|
|
|
|
|
.ToList();
|
2025-04-16 17:36:46 +08:00
|
|
|
|
|
|
|
|
|
|
var hasNext = validItems.Count > pageSize;
|
|
|
|
|
|
var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
|
|
|
|
|
|
|
2025-04-17 09:10:01 +08:00
|
|
|
|
//分页锚点索引
|
2025-04-16 17:36:46 +08:00
|
|
|
|
decimal? nextScore = null;
|
2025-04-16 23:51:27 +08:00
|
|
|
|
string nextMember = null;
|
|
|
|
|
|
if (hasNext && actualItems.Any())
|
2025-04-16 17:36:46 +08:00
|
|
|
|
{
|
2025-04-17 08:20:54 +08:00
|
|
|
|
var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引
|
2025-04-16 23:51:27 +08:00
|
|
|
|
nextScore = scores[lastIndex];
|
|
|
|
|
|
nextMember = members[lastIndex];
|
2025-04-16 17:36:46 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T>
|
|
|
|
|
|
{
|
|
|
|
|
|
Items = actualItems.ToList(),
|
|
|
|
|
|
HasNext = hasNext,
|
|
|
|
|
|
NextScore = nextScore,
|
|
|
|
|
|
NextMember = nextMember,
|
|
|
|
|
|
TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
|
|
|
|
|
|
PageSize = pageSize,
|
2025-04-17 09:10:01 +08:00
|
|
|
|
};
|
2025-04-16 17:36:46 +08:00
|
|
|
|
}
|
2025-05-13 14:51:38 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisCacheDeviceGroupSetIndexKey">ZSET索引缓存Key</param>
|
|
|
|
|
|
/// <param name="redisCacheDeviceInfoHashKey">主数据存储Hash缓存Key</param>
|
|
|
|
|
|
/// <param name="pageSize">分页尺寸</param>
|
|
|
|
|
|
/// <param name="lastScore">最后一个索引</param>
|
|
|
|
|
|
/// <param name="lastMember">最后一个唯一标识</param>
|
|
|
|
|
|
/// <param name="descending">排序方式</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedData2<T>(
|
|
|
|
|
|
string redisCacheDeviceGroupSetIndexKey,
|
|
|
|
|
|
string redisCacheDeviceInfoHashKey,
|
|
|
|
|
|
int pageSize = 1000,
|
|
|
|
|
|
decimal? lastScore = null,
|
|
|
|
|
|
string lastMember = null,
|
|
|
|
|
|
bool descending = true)
|
|
|
|
|
|
where T : DeviceCacheBasicModel
|
|
|
|
|
|
{
|
|
|
|
|
|
// 参数校验增强
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(redisCacheDeviceInfoHashKey) ||
|
|
|
|
|
|
string.IsNullOrWhiteSpace(redisCacheDeviceGroupSetIndexKey))
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
pageSize = Math.Clamp(pageSize, 1, 10000);
|
|
|
|
|
|
|
|
|
|
|
|
var luaScript = @"
|
|
|
|
|
|
local command = ARGV[1]
|
|
|
|
|
|
local range_start = ARGV[2]
|
|
|
|
|
|
local range_end = ARGV[3]
|
|
|
|
|
|
local limit = tonumber(ARGV[4])
|
|
|
|
|
|
local last_score = ARGV[5]
|
|
|
|
|
|
local last_member = ARGV[6]
|
|
|
|
|
|
|
|
|
|
|
|
-- 获取原始数据
|
|
|
|
|
|
local members
|
|
|
|
|
|
if command == 'ZRANGEBYSCORE' then
|
|
|
|
|
|
members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
|
|
|
|
|
else
|
|
|
|
|
|
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
-- 过滤数据
|
|
|
|
|
|
local filtered_members = {}
|
|
|
|
|
|
local count = 0
|
|
|
|
|
|
for i = 1, #members, 2 do
|
|
|
|
|
|
local member = members[i]
|
|
|
|
|
|
local score = members[i+1]
|
|
|
|
|
|
local include = true
|
|
|
|
|
|
if last_score ~= '' and last_member ~= '' then
|
|
|
|
|
|
if command == 'ZRANGEBYSCORE' then
|
|
|
|
|
|
-- 升序:score > last_score 或 (score == last_score 且 member > last_member)
|
|
|
|
|
|
if score == last_score then
|
|
|
|
|
|
include = member > last_member
|
|
|
|
|
|
else
|
|
|
|
|
|
include = tonumber(score) > tonumber(last_score)
|
|
|
|
|
|
end
|
|
|
|
|
|
else
|
|
|
|
|
|
-- 降序:score < last_score 或 (score == last_score 且 member < last_member)
|
|
|
|
|
|
if score == last_score then
|
|
|
|
|
|
include = member < last_member
|
|
|
|
|
|
else
|
|
|
|
|
|
include = tonumber(score) < tonumber(last_score)
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
if include then
|
|
|
|
|
|
table.insert(filtered_members, member)
|
|
|
|
|
|
table.insert(filtered_members, score)
|
|
|
|
|
|
count = count + 1
|
|
|
|
|
|
if count >= limit then
|
|
|
|
|
|
break
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
-- 提取有效数据
|
|
|
|
|
|
local result_members, result_scores = {}, {}
|
|
|
|
|
|
for i=1,#filtered_members,2 do
|
|
|
|
|
|
table.insert(result_members, filtered_members[i])
|
|
|
|
|
|
table.insert(result_scores, filtered_members[i+1])
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
if #result_members == 0 then return {0,{},{},{}} end
|
|
|
|
|
|
|
|
|
|
|
|
-- 获取Hash数据
|
|
|
|
|
|
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
|
|
|
|
|
|
return {#result_members, result_members, result_scores, hash_data}";
|
|
|
|
|
|
|
|
|
|
|
|
// 调整范围构造逻辑(移除排他符号)
|
|
|
|
|
|
string rangeStart, rangeEnd;
|
|
|
|
|
|
if (descending)
|
|
|
|
|
|
{
|
|
|
|
|
|
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
|
|
|
|
|
|
rangeEnd = "-inf";
|
|
|
|
|
|
}
|
|
|
|
|
|
else
|
|
|
|
|
|
{
|
|
|
|
|
|
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
|
|
|
|
|
|
rangeEnd = "+inf";
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var scriptResult = (object[])await Instance.EvalAsync(luaScript,
|
|
|
|
|
|
new[] { redisCacheDeviceGroupSetIndexKey, redisCacheDeviceInfoHashKey },
|
|
|
|
|
|
new object[]
|
|
|
|
|
|
{
|
|
|
|
|
|
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
|
|
|
|
|
|
rangeStart,
|
|
|
|
|
|
rangeEnd,
|
|
|
|
|
|
(pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页
|
|
|
|
|
|
lastScore?.ToString() ?? "",
|
|
|
|
|
|
lastMember ?? ""
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
if ((long)scriptResult[0] == 0)
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
|
|
|
|
|
|
|
|
|
|
// 处理结果集
|
|
|
|
|
|
var members = ((object[])scriptResult[1]).Cast<string>().ToList();
|
|
|
|
|
|
var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList();
|
|
|
|
|
|
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
|
|
|
|
|
|
|
|
|
|
|
|
var validItems = members.AsParallel()
|
|
|
|
|
|
.Select((m, i) =>
|
|
|
|
|
|
{
|
|
|
|
|
|
try { return BusJsonSerializer.Deserialize<T>(hashData[i]); }
|
|
|
|
|
|
catch { return null; }
|
|
|
|
|
|
})
|
|
|
|
|
|
.Where(x => x != null)
|
|
|
|
|
|
.ToList();
|
|
|
|
|
|
|
|
|
|
|
|
var hasNext = validItems.Count > pageSize;
|
|
|
|
|
|
var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
|
|
|
|
|
|
|
|
|
|
|
|
//分页锚点索引
|
|
|
|
|
|
decimal? nextScore = null;
|
|
|
|
|
|
string nextMember = null;
|
|
|
|
|
|
if (hasNext && actualItems.Any())
|
|
|
|
|
|
{
|
|
|
|
|
|
var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引
|
|
|
|
|
|
nextScore = scores[lastIndex];
|
|
|
|
|
|
nextMember = members[lastIndex];
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return new BusCacheGlobalPagedResult<T>
|
|
|
|
|
|
{
|
|
|
|
|
|
Items = actualItems.ToList(),
|
|
|
|
|
|
HasNext = hasNext,
|
|
|
|
|
|
NextScore = nextScore,
|
|
|
|
|
|
NextMember = nextMember,
|
|
|
|
|
|
TotalCount = await GetTotalCount(redisCacheDeviceGroupSetIndexKey),
|
|
|
|
|
|
PageSize = pageSize,
|
|
|
|
|
|
};
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-04-16 17:36:46 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 游标分页查询
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
|
|
|
|
|
|
/// <param name="pageSize">分页数量</param>
|
|
|
|
|
|
/// <param name="lastScore">上一个索引</param>
|
|
|
|
|
|
/// <param name="lastMember">上一个标识</param>
|
|
|
|
|
|
/// <param name="descending">排序方式</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private async Task<(List<string> Members, bool HasNext, decimal? NextScore, string NextMember)> GetPagedMembers(
|
|
|
|
|
|
string redisZSetScoresIndexCacheKey,
|
|
|
|
|
|
int pageSize,
|
|
|
|
|
|
decimal? lastScore,
|
|
|
|
|
|
string lastMember,
|
|
|
|
|
|
bool descending)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 根据排序方向初始化参数
|
|
|
|
|
|
long initialScore = descending ? long.MaxValue : 0;
|
|
|
|
|
|
decimal? currentScore = lastScore ?? initialScore;
|
|
|
|
|
|
string currentMember = lastMember;
|
|
|
|
|
|
var members = new List<string>(pageSize + 1);
|
|
|
|
|
|
|
|
|
|
|
|
// 使用游标分页查询
|
|
|
|
|
|
while (members.Count < pageSize + 1 && currentScore.HasValue)
|
|
|
|
|
|
{
|
|
|
|
|
|
var (batch, hasMore) = await GetNextBatch(
|
|
|
|
|
|
redisZSetScoresIndexCacheKey,
|
|
|
|
|
|
pageSize + 1 - members.Count,
|
|
|
|
|
|
currentScore.Value,
|
|
|
|
|
|
currentMember,
|
|
|
|
|
|
descending);
|
|
|
|
|
|
|
|
|
|
|
|
if (!batch.Any()) break;
|
|
|
|
|
|
|
|
|
|
|
|
members.AddRange(batch);
|
|
|
|
|
|
|
|
|
|
|
|
// 更新游标
|
|
|
|
|
|
currentMember = batch.LastOrDefault();
|
|
|
|
|
|
currentScore = await GetNextScore(redisZSetScoresIndexCacheKey, currentMember, descending);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 处理分页结果
|
|
|
|
|
|
bool hasNext = members.Count > pageSize;
|
|
|
|
|
|
var resultMembers = members.Take(pageSize).ToList();
|
|
|
|
|
|
|
|
|
|
|
|
return (
|
|
|
|
|
|
resultMembers,
|
|
|
|
|
|
hasNext,
|
|
|
|
|
|
currentScore,
|
|
|
|
|
|
currentMember
|
|
|
|
|
|
);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 批量获取指定分页的数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <typeparam name="T"></typeparam>
|
|
|
|
|
|
/// <param name="redisHashCacheKey">Hash表缓存key</param>
|
|
|
|
|
|
/// <param name="members">Hash表字段集合</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private async Task<Dictionary<string, T>> BatchGetData<T>(
|
|
|
|
|
|
string redisHashCacheKey,
|
|
|
|
|
|
IEnumerable<string> members)
|
|
|
|
|
|
where T : DeviceCacheBasicModel
|
|
|
|
|
|
{
|
|
|
|
|
|
using var pipe = Instance.StartPipe();
|
|
|
|
|
|
|
|
|
|
|
|
foreach (var member in members)
|
|
|
|
|
|
{
|
|
|
|
|
|
pipe.HGet<T>(redisHashCacheKey, member);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var results = pipe.EndPipe();
|
|
|
|
|
|
return await Task.FromResult(members.Zip(results, (k, v) => new { k, v })
|
|
|
|
|
|
.ToDictionary(x => x.k, x => (T)x.v));
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 处理下一个分页数据
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="zsetKey"></param>
|
|
|
|
|
|
/// <param name="limit"></param>
|
|
|
|
|
|
/// <param name="score"></param>
|
|
|
|
|
|
/// <param name="excludeMember"></param>
|
|
|
|
|
|
/// <param name="descending"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private async Task<(string[] Batch, bool HasMore)> GetNextBatch(
|
|
|
|
|
|
string zsetKey,
|
|
|
|
|
|
int limit,
|
|
|
|
|
|
decimal score,
|
|
|
|
|
|
string excludeMember,
|
|
|
|
|
|
bool descending)
|
|
|
|
|
|
{
|
|
|
|
|
|
var query = descending
|
|
|
|
|
|
? await Instance.ZRevRangeByScoreAsync(
|
|
|
|
|
|
zsetKey,
|
|
|
|
|
|
max: score,
|
|
|
|
|
|
min: 0,
|
|
|
|
|
|
offset: 0,
|
|
|
|
|
|
count: limit)
|
|
|
|
|
|
: await Instance.ZRangeByScoreAsync(
|
|
|
|
|
|
zsetKey,
|
|
|
|
|
|
min: score,
|
|
|
|
|
|
max: long.MaxValue,
|
|
|
|
|
|
offset: 0,
|
|
|
|
|
|
count: limit);
|
|
|
|
|
|
|
|
|
|
|
|
return (query, query.Length >= limit);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 获取下一页游标
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
|
|
|
|
|
|
/// <param name="lastMember">最后一个唯一标识</param>
|
|
|
|
|
|
/// <param name="descending">排序方式</param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private async Task<decimal?> GetNextScore(
|
|
|
|
|
|
string redisZSetScoresIndexCacheKey,
|
|
|
|
|
|
string lastMember,
|
|
|
|
|
|
bool descending)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (string.IsNullOrEmpty(lastMember)) return null;
|
|
|
|
|
|
|
|
|
|
|
|
var score = await Instance.ZScoreAsync(redisZSetScoresIndexCacheKey, lastMember);
|
|
|
|
|
|
if (!score.HasValue) return null;
|
|
|
|
|
|
|
|
|
|
|
|
// 根据排序方向调整score
|
|
|
|
|
|
return descending
|
|
|
|
|
|
? score.Value - 1 // 降序时下页查询小于当前score
|
|
|
|
|
|
: score.Value + 1; // 升序时下页查询大于当前score
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 获取指定ZSET区间内的总数量
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="zsetKey"></param>
|
|
|
|
|
|
/// <param name="min"></param>
|
|
|
|
|
|
/// <param name="max"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
public async Task<long> GetCount(string zsetKey, long min, long max)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 缓存计数优化
|
|
|
|
|
|
var cacheKey = $"{zsetKey}_count_{min}_{max}";
|
|
|
|
|
|
var cached = await Instance.GetAsync<long?>(cacheKey);
|
|
|
|
|
|
|
|
|
|
|
|
if (cached.HasValue)
|
|
|
|
|
|
return cached.Value;
|
|
|
|
|
|
|
|
|
|
|
|
var count = await Instance.ZCountAsync(zsetKey, min, max);
|
|
|
|
|
|
await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
|
|
|
|
|
|
return count;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 获取指定ZSET的总数量
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
/// <param name="redisZSetScoresIndexCacheKey"></param>
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
private async Task<long> GetTotalCount(string redisZSetScoresIndexCacheKey)
|
|
|
|
|
|
{
|
|
|
|
|
|
// 缓存计数优化
|
|
|
|
|
|
var cacheKey = $"{redisZSetScoresIndexCacheKey}_total_count";
|
|
|
|
|
|
var cached = await Instance.GetAsync<long?>(cacheKey);
|
|
|
|
|
|
|
|
|
|
|
|
if (cached.HasValue)
|
|
|
|
|
|
return cached.Value;
|
|
|
|
|
|
|
|
|
|
|
|
var count = await Instance.ZCountAsync(redisZSetScoresIndexCacheKey, 0, decimal.MaxValue);
|
|
|
|
|
|
await Instance.SetExAsync(cacheKey, 30, count); // 缓存30秒
|
|
|
|
|
|
return count;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|