using FreeRedis;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeRedisProvider.Options;
using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Text.Json;
using Volo.Abp.DependencyInjection;
using static System.Runtime.InteropServices.JavaScript.JSType;
using System.Collections.Concurrent;
namespace JiShe.CollectBus.FreeRedisProvider
{
public class FreeRedisProvider : IFreeRedisProvider, ISingletonDependency
{
private readonly FreeRedisOptions _option;
///
/// FreeRedis
///
public FreeRedisProvider(IOptions options)
{
_option = options.Value;
GetInstance();
}
public RedisClient Instance { get; set; } = new(string.Empty);
///
/// 获取 FreeRedis 客户端
///
///
public IRedisClient GetInstance()
{
var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB}";
Instance = new RedisClient(connectionString);
Instance.Serialize = obj => BusJsonSerializer.Serialize(obj);
Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type);
Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
return Instance;
}
///
/// 单个添加数据
///
///
/// 主数据存储Hash缓存Key
/// 集中器索引Set缓存Key
/// 集中器排序索引ZSET缓存Key
/// 集中器采集频率分组全局索引ZSet缓存Key
/// 表计信息
/// 可选时间戳
///
public async Task AddMeterCacheData(
string redisCacheKey,
string redisCacheFocusIndexKey,
string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
T data,
DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
{
// 参数校验增强
if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
|| string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
{
throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
}
// 计算组合score(分类ID + 时间戳)
var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
//全局索引写入
long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// 使用事务保证原子性
using (var trans = Instance.Multi())
{
// 主数据存储Hash
trans.HSet(redisCacheKey, data.MemberID, data.Serialize());
// 分类索引
trans.SAdd(redisCacheFocusIndexKey, data.MemberID);
// 排序索引使用ZSET
trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID);
//全局索引
trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID);
var results = trans.Exec();
if (results == null || results.Length <= 0)
throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
}
await Task.CompletedTask;
}
///
/// 批量添加数据
///
///
/// 主数据存储Hash缓存Key
/// 集中器索引Set缓存Key
/// 集中器排序索引ZSET缓存Key
/// 集中器采集频率分组全局索引ZSet缓存Key
/// 数据集合
/// 可选时间戳
///
public async Task BatchAddMeterData(
string redisCacheKey,
string redisCacheFocusIndexKey,
string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
IEnumerable items,
DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
{
if (items == null
|| items.Count() <=0
|| string.IsNullOrWhiteSpace(redisCacheKey)
|| string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
{
throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101");
}
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
foreach (var batch in items.Batch(BATCH_SIZE))
{
await semaphore.WaitAsync();
_ = Task.Run(() =>
{
using (var pipe = Instance.StartPipe())
{
foreach (var item in batch)
{
// 计算组合score(分类ID + 时间戳)
var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
//全局索引写入
long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// 主数据存储Hash
pipe.HSet(redisCacheKey, item.MemberID, item.Serialize());
// 分类索引
pipe.SAdd(redisCacheFocusIndexKey, item.MemberID);
// 排序索引使用ZSET
pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID);
//全局索引
pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID);
}
pipe.EndPipe();
}
semaphore.Release();
});
}
await Task.CompletedTask;
}
///
/// 删除指定redis缓存key的缓存数据
///
///
/// 主数据存储Hash缓存Key
/// 集中器索引Set缓存Key
/// 集中器排序索引ZSET缓存Key
/// 集中器采集频率分组全局索引ZSet缓存Key
/// 表计信息
///
public async Task RemoveMeterData(
string redisCacheKey,
string redisCacheFocusIndexKey,
string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
T data) where T : DeviceCacheBasicModel
{
if (data == null
|| string.IsNullOrWhiteSpace(redisCacheKey)
|| string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
{
throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101");
}
const string luaScript = @"
local mainKey = KEYS[1]
local focusIndexKey = KEYS[2]
local scoresIndexKey = KEYS[3]
local globalIndexKey = KEYS[4]
local member = ARGV[1]
local deleted = 0
if redis.call('HDEL', mainKey, member) > 0 then
deleted = 1
end
redis.call('SREM', focusIndexKey, member)
redis.call('ZREM', scoresIndexKey, member)
redis.call('ZREM', globalIndexKey, member)
return deleted
";
var keys = new[]
{
redisCacheKey,
redisCacheFocusIndexKey,
redisCacheScoresIndexKey,
redisCacheGlobalIndexKey
};
var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
if ((int)result == 0)
throw new KeyNotFoundException("指定数据不存在");
}
///
/// 修改表计缓存信息
///
///
/// 主数据存储Hash缓存Key
/// 旧集中器索引Set缓存Key
/// 新集中器索引Set缓存Key
/// 集中器排序索引ZSET缓存Key
/// 集中器采集频率分组全局索引ZSet缓存Key
/// 表计信息
/// 可选时间戳
///
public async Task UpdateMeterData(
string redisCacheKey,
string oldRedisCacheFocusIndexKey,
string newRedisCacheFocusIndexKey,
string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
T newData,
DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel
{
if (newData == null
|| string.IsNullOrWhiteSpace(redisCacheKey)
|| string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey)
|| string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
{
throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101");
}
var luaScript = @"
local mainKey = KEYS[1]
local oldFocusIndexKey = KEYS[2]
local newFocusIndexKey = KEYS[3]
local scoresIndexKey = KEYS[4]
local globalIndexKey = KEYS[5]
local member = ARGV[1]
local newData = ARGV[2]
local newScore = ARGV[3]
local newGlobalScore = ARGV[4]
-- 校验存在性
if redis.call('HEXISTS', mainKey, member) == 0 then
return 0
end
-- 更新主数据
redis.call('HSET', mainKey, member, newData)
-- 处理变更
if newScore ~= '' then
-- 删除旧索引
redis.call('SREM', oldFocusIndexKey, member)
redis.call('ZREM', scoresIndexKey, member)
-- 添加新索引
redis.call('SADD', newFocusIndexKey, member)
redis.call('ZADD', scoresIndexKey, newScore, member)
end
-- 更新全局索引
if newGlobalScore ~= '' then
-- 删除旧索引
redis.call('ZREM', globalIndexKey, member)
-- 添加新索引
redis.call('ZADD', globalIndexKey, newGlobalScore, member)
end
return 1
";
var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds();
var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks;
var result = await Instance.EvalAsync(luaScript,
new[]
{
redisCacheKey,
oldRedisCacheFocusIndexKey,
newRedisCacheFocusIndexKey,
redisCacheScoresIndexKey,
redisCacheGlobalIndexKey
},
new object[]
{
newData.MemberID,
newData.Serialize(),
newScoreValue.ToString() ?? "",
newGlobalScore.ToString() ?? ""
});
if ((int)result == 0)
{
throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在");
}
}
public async Task> SingleGetMeterPagedData(
string redisCacheKey,
string redisCacheScoresIndexKey,
int focusId,
int pageSize = 10,
int pageIndex = 1,
bool descending = true)
{
// 计算score范围
long minScore = (long)focusId << 32;
long maxScore = ((long)focusId + 1) << 32;
// 分页参数计算
int start = (pageIndex - 1) * pageSize;
// 获取排序后的member列表
var members = descending
? await Instance.ZRevRangeByScoreAsync(
redisCacheScoresIndexKey,
maxScore,
minScore,
start,
pageSize)
: await Instance.ZRangeByScoreAsync(
redisCacheScoresIndexKey,
minScore,
maxScore,
start,
pageSize);
// 批量获取实际数据
var dataTasks = members.Select(m =>
Instance.HGetAsync(redisCacheKey, m)).ToArray();
await Task.WhenAll(dataTasks);
// 总数统计优化
var total = await Instance.ZCountAsync(
redisCacheScoresIndexKey,
minScore,
maxScore);
return new BusPagedResult
{
Items = dataTasks.Select(t => t.Result).ToList(),
TotalCount = total,
PageIndex = pageIndex,
PageSize = pageSize
};
}
public async Task> GetFocusPagedData(
string redisCacheKey,
string redisCacheScoresIndexKey,
int focusId,
int pageSize = 10,
long? lastScore = null,
string lastMember = null,
bool descending = true) where T : DeviceCacheBasicModel
{
// 计算分数范围
long minScore = (long)focusId << 32;
long maxScore = ((long)focusId + 1) << 32;
// 获取成员列表
var members = await GetSortedMembers(
redisCacheScoresIndexKey,
minScore,
maxScore,
pageSize,
lastScore,
lastMember,
descending);
// 批量获取数据
var dataDict = await Instance.HMGetAsync(redisCacheKey, members.CurrentItems);
return new BusPagedResult
{
Items = dataDict,
TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore),
HasNext = members.HasNext,
NextScore = members.NextScore,
NextMember = members.NextMember
};
}
private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)>
GetSortedMembers(
string zsetKey,
long minScore,
long maxScore,
int pageSize,
long? lastScore,
string lastMember,
bool descending)
{
var querySize = pageSize + 1;
var (startScore, exclude) = descending
? (lastScore ?? maxScore, lastMember)
: (lastScore ?? minScore, lastMember);
var members = descending
? await Instance.ZRevRangeByScoreAsync(
zsetKey,
max: startScore,
min: minScore,
offset: 0,
count: querySize)
: await Instance.ZRangeByScoreAsync(
zsetKey,
min: startScore,
max: maxScore,
offset: 0,
count: querySize);
var hasNext = members.Length > pageSize;
var currentItems = members.Take(pageSize).ToArray();
var nextCursor = currentItems.Any()
? await GetNextCursor(zsetKey, currentItems.Last(), descending)
: (null, null);
return (currentItems, hasNext, nextCursor.score, nextCursor.member);
}
private async Task GetTotalCount(string zsetKey, long min, long max)
{
// 缓存计数优化
var cacheKey = $"{zsetKey}_count_{min}_{max}";
var cached = await Instance.GetAsync(cacheKey);
if (cached.HasValue)
return cached.Value;
var count = await Instance.ZCountAsync(zsetKey, min, max);
await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
return count;
}
public async Task>> BatchGetMeterPagedData(
string redisCacheKey,
string redisCacheScoresIndexKey,
IEnumerable focusIds,
int pageSizePerFocus = 10) where T : DeviceCacheBasicModel
{
var results = new ConcurrentDictionary>();
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount * 2
};
await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) =>
{
var data = await SingleGetMeterPagedData(
redisCacheKey,
redisCacheScoresIndexKey,
focusId,
pageSizePerFocus);
results.TryAdd(focusId, data);
});
return new Dictionary>(results);
}
///
/// 通过全局索引分页查询表计缓存数据
///
///
/// 主数据存储Hash缓存Key
/// 集中器采集频率分组全局索引ZSet缓存Key
/// 分页尺寸
/// 最后一个索引
/// 最后一个唯一标识
/// 排序方式
///
public async Task> GetGlobalPagedData(
string redisCacheKey,
string redisCacheGlobalIndexKey,
int pageSize = 10,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel
{
// 参数校验增强
if (string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
{
throw new ArgumentException($"{nameof(GetGlobalPagedData)} 参数异常,-101");
}
if (pageSize < 1 || pageSize > 1000)
{
throw new ArgumentException($"{nameof(GetGlobalPagedData)} 分页大小应在1-1000之间,-102");
}
// 分页参数解析
var (startScore, excludeMember) = descending
? (lastScore ?? decimal.MaxValue, lastMember)
: (lastScore ?? 0, lastMember);
// 游标分页查询
var (members, hasNext) = await GetPagedMembers(
redisCacheGlobalIndexKey,
pageSize,
startScore,
excludeMember,
descending);
// 批量获取数据(优化内存分配)
var dataDict = await BatchGetData(redisCacheKey, members);
// 获取下一页游标
var nextCursor = members.Any()
? await GetNextCursor(redisCacheGlobalIndexKey, members.Last(), descending)
: (null, null);
return new BusCacheGlobalPagedResult
{
Items = members.Select(m => dataDict.TryGetValue(m, out var v) ? v : default)
.Where(x => x != null).ToList(),
HasNext = hasNext,
NextScore = nextCursor.score,
NextMember = nextCursor.member
};
}
///
/// 游标分页查询
///
///
/// 分页数量
/// 开始索引
/// 开始唯一标识
/// 排序方式
///
private async Task<(List Members, bool HasNext)> GetPagedMembers(
string redisCacheGlobalIndexKey,
int pageSize,
decimal? startScore,
string excludeMember,
bool descending)
{
const int bufferSize = 50; // 预读缓冲区大小
// 使用流式分页(避免OFFSET性能问题)
var members = new List(pageSize + 1);
decimal? currentScore = startScore;
string lastMember = excludeMember;
while (members.Count < pageSize + 1 && currentScore.HasValue)
{
var querySize = Math.Min(bufferSize, pageSize + 1 - members.Count);
var batch = descending
? await Instance.ZRevRangeByScoreAsync(
redisCacheGlobalIndexKey,
max: currentScore.Value,
min: 0,
offset: 0,
count: querySize
)
: await Instance.ZRangeByScoreAsync(
redisCacheGlobalIndexKey,
min: currentScore.Value,
max: long.MaxValue,
offset: 0,
count: querySize);
if (!batch.Any()) break;
members.AddRange(batch);
lastMember = batch.LastOrDefault();
currentScore = await Instance.ZScoreAsync(redisCacheGlobalIndexKey, lastMember);
}
return (
members.Take(pageSize).ToList(),
members.Count > pageSize
);
}
///
/// 批量获取指定分页的数据
///
///
///
///
///
private async Task> BatchGetData(
string hashKey,
IEnumerable members)
where T : DeviceCacheBasicModel
{
const int batchSize = 100;
var result = new Dictionary();
foreach (var batch in members.Batch(batchSize))
{
var batchArray = batch.ToArray();
var values = await Instance.HMGetAsync(hashKey, batchArray);
for (int i = 0; i < batchArray.Length; i++)
{
if (EqualityComparer.Default.Equals(values[i], default)) continue;
result[batchArray[i]] = values[i];
}
}
return result;
}
///
/// 获取下一页游标
///
/// 全局索引Key
/// 最后一个唯一标识
/// 排序方式
///
private async Task<(decimal? score, string member)> GetNextCursor(
string redisCacheGlobalIndexKey,
string lastMember,
bool descending)
{
if (string.IsNullOrWhiteSpace(lastMember))
{
return (null, null);
}
var score = await Instance.ZScoreAsync(redisCacheGlobalIndexKey, lastMember);
return score.HasValue
? (Convert.ToInt64(score.Value), lastMember)
: (null, null);
}
}
}