746 lines
28 KiB
C#
Raw Normal View History

2025-04-16 17:36:46 +08:00
using Confluent.Kafka;
using FreeRedis;
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.FreeRedisProvider;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.RedisDataCache
{
/// <summary>
/// 数据缓存服务接口
/// </summary>
public class RedisDataCacheService : IRedisDataCacheService, ITransientDependency
{
private readonly IFreeRedisProvider _freeRedisProvider;
private readonly ILogger<RedisDataCacheService> _logger;
private RedisClient Instance { get; set; }
/// <summary>
/// 数据缓存服务接口
/// </summary>
/// <param name="freeRedisProvider"></param>
/// <param name="logger"></param>
public RedisDataCacheService(IFreeRedisProvider freeRedisProvider,
ILogger<RedisDataCacheService> logger)
{
this._freeRedisProvider = freeRedisProvider;
this._logger = logger;
Instance = _freeRedisProvider.Instance;
}
/// <summary>
/// 单个添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="data">待缓存数据</param>
/// <returns></returns>
public async Task InsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel
{
// 参数校验增强
if (data == null || string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(InsertDataAsync)} 参数异常,-101");
return;
}
// 使用事务保证原子性
using (var trans = Instance.Multi())
{
// 主数据存储Hash
trans.HSet(redisHashCacheKey, data.MemberID, data.Serialize());
// 集中器号分组索引Set缓存
trans.SAdd(redisSetIndexCacheKey, data.MemberID);
// 集中器与表计信息排序索引ZSET缓存Key
trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberID);
var results = trans.Exec();
if (results == null || results.Length <= 0)
{
_logger.LogError($"{nameof(InsertDataAsync)} 添加事务提交失败,-102");
}
}
await Task.CompletedTask;
}
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="items">待缓存数据集合</param>
/// <returns></returns>
public async Task BatchInsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel
{
if (items == null
|| items.Count() <= 0
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101");
return;
}
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
foreach (var batch in items.Batch(BATCH_SIZE))
{
await semaphore.WaitAsync();
_ = Task.Run(() =>
{
using (var pipe = Instance.StartPipe())
{
foreach (var item in batch)
{
// 主数据存储Hash
pipe.HSet(redisHashCacheKey, item.MemberID, item.Serialize());
// Set索引缓存
pipe.SAdd(redisSetIndexCacheKey, item.MemberID);
// ZSET索引缓存Key
pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberID);
}
pipe.EndPipe();
}
semaphore.Release();
});
}
await Task.CompletedTask;
}
/// <summary>
/// 删除缓存信息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="data">已缓存数据</param>
/// <returns></returns>
public async Task RemoveCacheDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
T data) where T : DeviceCacheBasicModel
{
if (data == null
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 参数异常,-101");
return;
}
const string luaScript = @"
local hashCacheKey = KEYS[1]
local setIndexCacheKey = KEYS[2]
local zsetScoresIndexCacheKey = KEYS[3]
local member = ARGV[1]
local deleted = 0
if redis.call('HDEL', hashCacheKey, member) > 0 then
deleted = 1
end
redis.call('SREM', setIndexCacheKey, member)
redis.call('ZREM', zsetScoresIndexCacheKey, member)
return deleted
";
var keys = new[]
{
redisHashCacheKey,
redisSetIndexCacheKey,
redisZSetScoresIndexCacheKey
};
var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
if ((int)result == 0)
{
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberID}数据失败,-102");
}
}
/// <summary>
/// 修改缓存信息,映射关系未发生改变
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="newData">待修改缓存数据</param>
/// <returns></returns>
public async Task ModifyDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
T newData) where T : DeviceCacheBasicModel
{
if (newData == null
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(ModifyDataAsync)} 参数异常,-101");
return;
}
var luaScript = @"
local hashCacheKey = KEYS[1]
local member = ARGV[1]
local newData = ARGV[2]
--
if redis.call('HEXISTS', hashCacheKey, member) == 0 then
return 0
end
--
redis.call('HSET', hashCacheKey, member, newData)
return 1
";
var result = await Instance.EvalAsync(luaScript,
new[]
{
redisHashCacheKey
},
new object[]
{
newData.MemberID,
newData.Serialize()
});
if ((int)result == 0)
{
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102");
}
}
/// <summary>
/// 修改缓存信息,映射关系已经改变
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="oldMemberId">旧的映射关系</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="newData">待修改缓存数据</param>
/// <returns></returns>
public async Task ModifyDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string oldMemberId,
string redisZSetScoresIndexCacheKey,
T newData) where T : DeviceCacheBasicModel
{
if (newData == null
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(oldMemberId)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(ModifyDataAsync)} 参数异常,-101");
return;
}
var luaScript = @"
local hashCacheKey = KEYS[1]
local setIndexCacheKey = KEYS[2]
local zsetScoresIndexCacheKey = KEYS[3]
local member = ARGV[1]
local oldMember = ARGV[2]
local newData = ARGV[3]
local newScore = ARGV[4]
--
if redis.call('HEXISTS', hashCacheKey, oldMember) == 0 then
return 0
end
--
redis.call('HDEL', hashCacheKey, oldMember)
--
redis.call('HSET', hashCacheKey, member, newData)
--
if newScore ~= '' then
--
redis.call('SREM', setIndexCacheKey, oldMember)
redis.call('ZREM', zsetScoresIndexCacheKey, oldMember)
--
redis.call('SADD', setIndexCacheKey, member)
redis.call('ZADD', zsetScoresIndexCacheKey, newScore, member)
end
return 1
";
var result = await Instance.EvalAsync(luaScript,
new[]
{
redisHashCacheKey,
redisSetIndexCacheKey,
redisZSetScoresIndexCacheKey
},
new object[]
{
newData.MemberID,
oldMemberId,
newData.Serialize(),
newData.ScoreValue.ToString() ?? "",
});
if ((int)result == 0)
{
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102");
}
}
/// <summary>
/// 通过集中器与表计信息排序索引获取指定集中器号集合数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisCacheFocusScoresIndexKey">集中器与表计信息排序索引ZSET缓存Key</param>
/// <param name="focusIds">集中器Id</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
public async Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
string redisCacheKey,
string redisCacheFocusScoresIndexKey,
IEnumerable<int> focusIds,
int pageSize = 10,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel
{
throw new Exception();
}
/// <summary>
/// 通过ZSET索引获取数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedData<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel
{
// 参数校验(保持不变)
if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
return null;
}
if (pageSize < 1 || pageSize > 10000)
{
_logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间-102");
return null;
}
var luaScript = @"
local command = ARGV[1]
local range_start = ARGV[2]
local range_end = ARGV[3]
local limit = tonumber(ARGV[4])
local last_score = ARGV[5]
local last_member = ARGV[6]
--
if last_score ~= '' and last_member ~= '' then
if command == 'ZRANGEBYSCORE' then
range_start = '(' .. last_score
range_end = '(' .. last_score .. ' ' .. last_member
else
range_start = '(' .. last_score .. ' ' .. last_member
range_end = '(' .. last_score
end
end
--
local members
if command == 'ZRANGEBYSCORE' then
members = redis.call(command, KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit)
else
members = redis.call(command, KEYS[1], range_end, range_start,
'WITHSCORES', 'LIMIT', 0, limit)
end
--
local result_members = {}
local result_scores = {}
for i = 1, #members, 2 do
table.insert(result_members, members[i])
table.insert(result_scores, members[i+1])
end
-- Hash数据
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {
#result_members,
result_members,
result_scores,
hash_data
}";
//正确设置范围参数
string rangeStart, rangeEnd;
if (descending)
{
rangeStart = lastScore.HasValue ? $"({lastScore}" : "+inf";
rangeEnd = "-inf"; // 降序时固定为最小值
}
else
{
rangeStart = lastScore.HasValue ? $"({lastScore}" : "-inf";
rangeEnd = "+inf"; // 升序时固定为最大值
}
var result = (object[])await Instance.EvalAsync(
luaScript,
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
new object[]
{
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
rangeStart,
rangeEnd,
(pageSize + 1).ToString(), // 多取1条用于判断hasNext
lastScore?.ToString() ?? "",
lastMember ?? ""
});
if ((long)result[0] == 0)
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
// 处理结果集
var members = ((object[])result[1]).Cast<string>().ToList();
var scores = ((object[])result[2]).Cast<string>().Select(decimal.Parse).ToList();
var hashData = ((object[])result[3]).Cast<string>().ToList();
//合并有效数据并处理游标
var validItems = members.Zip(hashData, (m, h) =>
!string.IsNullOrWhiteSpace(h) ? BusJsonSerializer.Deserialize<T>(h) : null)
.Where(x => x != null)
.Take(pageSize + 1)
.ToList();
var hasNext = validItems.Count > pageSize;
var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
// 计算下一页起始点
string nextMember = null;
decimal? nextScore = null;
if (hasNext)
{
// 获取实际返回的最后一条有效数据
var lastValidIndex = actualItems.Count() - 1;
nextMember = members[lastValidIndex];
nextScore = scores[lastValidIndex];
}
return new BusCacheGlobalPagedResult<T>
{
Items = actualItems.ToList(),
HasNext = hasNext,
NextScore = nextScore,
NextMember = nextMember,
TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
PageSize = pageSize,
};
}
///// <summary>
///// 通过集中器与表计信息排序索引获取数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
///// <param name="pageSize">分页尺寸</param>
///// <param name="lastScore">最后一个索引</param>
///// <param name="lastMember">最后一个唯一标识</param>
///// <param name="descending">排序方式</param>
///// <returns></returns>
//public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedData<T>(
//string redisHashCacheKey,
//string redisZSetScoresIndexCacheKey,
//int pageSize = 1000,
//decimal? lastScore = null,
//string lastMember = null,
//bool descending = true)
//where T : DeviceCacheBasicModel
//{
// // 参数校验增强
// if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
// {
// _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
// return null;
// }
// if (pageSize < 1 || pageSize > 10000)
// {
// _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间-102");
// return null;
// }
// //// 分页参数解析
// //var (startScore, excludeMember) = descending
// // ? (lastScore ?? decimal.MaxValue, lastMember)
// // : (lastScore ?? 0, lastMember);
// //执行分页查询(整合游标处理)
// var pageResult = await GetPagedMembers(
// redisZSetScoresIndexCacheKey,
// pageSize,
// lastScore,
// lastMember,
// descending);
// // 批量获取数据(优化内存分配)
// var dataDict = await BatchGetData<T>(redisHashCacheKey, pageResult.Members);
// return new BusCacheGlobalPagedResult<T>
// {
// Items = pageResult.Members.Select(m => dataDict.TryGetValue(m, out var v) ? v : default)
// .Where(x => x != null).ToList(),
// HasNext = pageResult.HasNext,
// NextScore = pageResult.NextScore,
// NextMember = pageResult.NextMember,
// TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
// PageSize = pageSize,
// };
//}
/// <summary>
/// 游标分页查询
/// </summary>
/// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
/// <param name="pageSize">分页数量</param>
/// <param name="lastScore">上一个索引</param>
/// <param name="lastMember">上一个标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
private async Task<(List<string> Members, bool HasNext, decimal? NextScore, string NextMember)> GetPagedMembers(
string redisZSetScoresIndexCacheKey,
int pageSize,
decimal? lastScore,
string lastMember,
bool descending)
{
// 根据排序方向初始化参数
long initialScore = descending ? long.MaxValue : 0;
decimal? currentScore = lastScore ?? initialScore;
string currentMember = lastMember;
var members = new List<string>(pageSize + 1);
// 使用游标分页查询
while (members.Count < pageSize + 1 && currentScore.HasValue)
{
var (batch, hasMore) = await GetNextBatch(
redisZSetScoresIndexCacheKey,
pageSize + 1 - members.Count,
currentScore.Value,
currentMember,
descending);
if (!batch.Any()) break;
members.AddRange(batch);
// 更新游标
currentMember = batch.LastOrDefault();
currentScore = await GetNextScore(redisZSetScoresIndexCacheKey, currentMember, descending);
}
// 处理分页结果
bool hasNext = members.Count > pageSize;
var resultMembers = members.Take(pageSize).ToList();
return (
resultMembers,
hasNext,
currentScore,
currentMember
);
}
/// <summary>
/// 批量获取指定分页的数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">Hash表缓存key</param>
/// <param name="members">Hash表字段集合</param>
/// <returns></returns>
private async Task<Dictionary<string, T>> BatchGetData<T>(
string redisHashCacheKey,
IEnumerable<string> members)
where T : DeviceCacheBasicModel
{
using var pipe = Instance.StartPipe();
foreach (var member in members)
{
pipe.HGet<T>(redisHashCacheKey, member);
}
var results = pipe.EndPipe();
return await Task.FromResult(members.Zip(results, (k, v) => new { k, v })
.ToDictionary(x => x.k, x => (T)x.v));
}
/// <summary>
/// 处理下一个分页数据
/// </summary>
/// <param name="zsetKey"></param>
/// <param name="limit"></param>
/// <param name="score"></param>
/// <param name="excludeMember"></param>
/// <param name="descending"></param>
/// <returns></returns>
private async Task<(string[] Batch, bool HasMore)> GetNextBatch(
string zsetKey,
int limit,
decimal score,
string excludeMember,
bool descending)
{
var query = descending
? await Instance.ZRevRangeByScoreAsync(
zsetKey,
max: score,
min: 0,
offset: 0,
count: limit)
: await Instance.ZRangeByScoreAsync(
zsetKey,
min: score,
max: long.MaxValue,
offset: 0,
count: limit);
return (query, query.Length >= limit);
}
/// <summary>
/// 获取下一页游标
/// </summary>
/// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
private async Task<decimal?> GetNextScore(
string redisZSetScoresIndexCacheKey,
string lastMember,
bool descending)
{
if (string.IsNullOrEmpty(lastMember)) return null;
var score = await Instance.ZScoreAsync(redisZSetScoresIndexCacheKey, lastMember);
if (!score.HasValue) return null;
// 根据排序方向调整score
return descending
? score.Value - 1 // 降序时下页查询小于当前score
: score.Value + 1; // 升序时下页查询大于当前score
}
/// <summary>
/// 获取指定ZSET区间内的总数量
/// </summary>
/// <param name="zsetKey"></param>
/// <param name="min"></param>
/// <param name="max"></param>
/// <returns></returns>
public async Task<long> GetCount(string zsetKey, long min, long max)
{
// 缓存计数优化
var cacheKey = $"{zsetKey}_count_{min}_{max}";
var cached = await Instance.GetAsync<long?>(cacheKey);
if (cached.HasValue)
return cached.Value;
var count = await Instance.ZCountAsync(zsetKey, min, max);
await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
return count;
}
/// <summary>
/// 获取指定ZSET的总数量
/// </summary>
/// <param name="redisZSetScoresIndexCacheKey"></param>
/// <returns></returns>
private async Task<long> GetTotalCount(string redisZSetScoresIndexCacheKey)
{
// 缓存计数优化
var cacheKey = $"{redisZSetScoresIndexCacheKey}_total_count";
var cached = await Instance.GetAsync<long?>(cacheKey);
if (cached.HasValue)
return cached.Value;
var count = await Instance.ZCountAsync(redisZSetScoresIndexCacheKey, 0, decimal.MaxValue);
await Instance.SetExAsync(cacheKey, 30, count); // 缓存30秒
return count;
}
}
}