502 lines
20 KiB
C#
Raw Normal View History

2025-04-17 20:28:50 +08:00
using System.Diagnostics;
using FreeRedis;
2025-04-15 16:05:07 +08:00
using JiShe.CollectBus.Common.Helpers;
2025-04-17 20:28:50 +08:00
using JiShe.CollectBus.FreeRedis.Options;
2025-03-17 08:35:19 +08:00
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
2025-04-17 20:28:50 +08:00
namespace JiShe.CollectBus.FreeRedis
2025-03-17 08:35:19 +08:00
{
public class FreeRedisProvider : IFreeRedisProvider, ISingletonDependency
{
private readonly FreeRedisOptions _option;
/// <summary>
/// FreeRedis
/// </summary>
public FreeRedisProvider(IOptions<FreeRedisOptions> options)
{
_option = options.Value;
GetInstance();
}
2025-04-15 15:49:51 +08:00
public RedisClient Instance { get; set; } = new(string.Empty);
2025-03-17 08:35:19 +08:00
/// <summary>
/// 获取 FreeRedis 客户端
/// </summary>
/// <returns></returns>
public IRedisClient GetInstance()
{
2025-04-15 15:49:51 +08:00
2025-04-16 17:36:46 +08:00
var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB},MaxPoolSize={_option.MaxPoolSize}";
2025-03-17 08:35:19 +08:00
Instance = new RedisClient(connectionString);
Instance.Serialize = obj => BusJsonSerializer.Serialize(obj);
Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type);
2025-04-16 17:36:46 +08:00
Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
2025-03-17 08:35:19 +08:00
return Instance;
}
2025-04-15 15:49:51 +08:00
2025-04-16 17:36:46 +08:00
///// <summary>
///// 单个添加数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="data">表计信息</param>
///// <param name="timestamp">可选时间戳</param>
///// <returns></returns>
//public async Task AddMeterCacheData<T>(
//string redisCacheKey,
//string redisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//T data,
//DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
//{
// // 参数校验增强
// if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
// }
// // 计算组合score分类ID + 时间戳)
// var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
// long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
// //全局索引写入
// long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// // 使用事务保证原子性
// using (var trans = Instance.Multi())
// {
// // 主数据存储Hash
// trans.HSet(redisCacheKey, data.MemberID, data.Serialize());
// // 分类索引
// trans.SAdd(redisCacheFocusIndexKey, data.MemberID);
// // 排序索引使用ZSET
// trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID);
// //全局索引
// trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID);
// var results = trans.Exec();
// if (results == null || results.Length <= 0)
// throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
// }
// await Task.CompletedTask;
//}
///// <summary>
///// 批量添加数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="items">数据集合</param>
///// <param name="timestamp">可选时间戳</param>
///// <returns></returns>
//public async Task BatchAddMeterData<T>(
//string redisCacheKey,
//string redisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//IEnumerable<T> items,
//DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
//{
// if (items == null
// || items.Count() <=0
// || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101");
// }
// const int BATCH_SIZE = 1000; // 每批1000条
// var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
// foreach (var batch in items.Batch(BATCH_SIZE))
// {
// await semaphore.WaitAsync();
// _ = Task.Run(() =>
// {
// using (var pipe = Instance.StartPipe())
// {
// foreach (var item in batch)
// {
// // 计算组合score分类ID + 时间戳)
// var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
// long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
// //全局索引写入
// long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// // 主数据存储Hash
// pipe.HSet(redisCacheKey, item.MemberID, item.Serialize());
// // 分类索引Set
// pipe.SAdd(redisCacheFocusIndexKey, item.MemberID);
// // 排序索引使用ZSET
// pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID);
// //全局索引
// pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID);
// }
// pipe.EndPipe();
// }
// semaphore.Release();
// });
// }
// await Task.CompletedTask;
//}
///// <summary>
///// 删除指定redis缓存key的缓存数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="data">表计信息</param>
///// <returns></returns>
//public async Task RemoveMeterData<T>(
//string redisCacheKey,
//string redisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//T data) where T : DeviceCacheBasicModel
//{
// if (data == null
// || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101");
// }
// const string luaScript = @"
// local mainKey = KEYS[1]
// local focusIndexKey = KEYS[2]
// local scoresIndexKey = KEYS[3]
// local globalIndexKey = KEYS[4]
// local member = ARGV[1]
// local deleted = 0
// if redis.call('HDEL', mainKey, member) > 0 then
// deleted = 1
// end
// redis.call('SREM', focusIndexKey, member)
// redis.call('ZREM', scoresIndexKey, member)
// redis.call('ZREM', globalIndexKey, member)
// return deleted
// ";
// var keys = new[]
// {
// redisCacheKey,
// redisCacheFocusIndexKey,
// redisCacheScoresIndexKey,
// redisCacheGlobalIndexKey
// };
// var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
// if ((int)result == 0)
// throw new KeyNotFoundException("指定数据不存在");
//}
///// <summary>
///// 修改表计缓存信息
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
///// <param name="oldRedisCacheFocusIndexKey">旧集中器索引Set缓存Key</param>
///// <param name="newRedisCacheFocusIndexKey">新集中器索引Set缓存Key</param>
///// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
///// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
///// <param name="newData">表计信息</param>
///// <param name="newTimestamp">可选时间戳</param>
///// <returns></returns>
//public async Task UpdateMeterData<T>(
//string redisCacheKey,
//string oldRedisCacheFocusIndexKey,
//string newRedisCacheFocusIndexKey,
//string redisCacheScoresIndexKey,
//string redisCacheGlobalIndexKey,
//T newData,
//DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel
//{
// if (newData == null
// || string.IsNullOrWhiteSpace(redisCacheKey)
// || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
// || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
// {
// throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101");
// }
// var luaScript = @"
// local mainKey = KEYS[1]
// local oldFocusIndexKey = KEYS[2]
// local newFocusIndexKey = KEYS[3]
// local scoresIndexKey = KEYS[4]
// local globalIndexKey = KEYS[5]
// local member = ARGV[1]
// local newData = ARGV[2]
// local newScore = ARGV[3]
// local newGlobalScore = ARGV[4]
// -- 校验存在性
// if redis.call('HEXISTS', mainKey, member) == 0 then
// return 0
// end
// -- 更新主数据
// redis.call('HSET', mainKey, member, newData)
// -- 处理变更
// if newScore ~= '' then
// -- 删除旧索引
// redis.call('SREM', oldFocusIndexKey, member)
// redis.call('ZREM', scoresIndexKey, member)
2025-04-15 23:20:46 +08:00
2025-04-16 17:36:46 +08:00
// -- 添加新索引
// redis.call('SADD', newFocusIndexKey, member)
// redis.call('ZADD', scoresIndexKey, newScore, member)
// end
// -- 更新全局索引
// if newGlobalScore ~= '' then
// -- 删除旧索引
// redis.call('ZREM', globalIndexKey, member)
// -- 添加新索引
// redis.call('ZADD', globalIndexKey, newGlobalScore, member)
// end
// return 1
// ";
// var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
// var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds();
// var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks;
// var result = await Instance.EvalAsync(luaScript,
// new[]
// {
// redisCacheKey,
// oldRedisCacheFocusIndexKey,
// newRedisCacheFocusIndexKey,
// redisCacheScoresIndexKey,
// redisCacheGlobalIndexKey
// },
// new object[]
// {
// newData.MemberID,
// newData.Serialize(),
// newScoreValue.ToString() ?? "",
// newGlobalScore.ToString() ?? ""
// });
// if ((int)result == 0)
// {
// throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在");
// }
//}
2025-04-15 23:20:46 +08:00
2025-04-16 17:36:46 +08:00
//public async Task<BusPagedResult<T>> SingleGetMeterPagedData<T>(
//string redisCacheKey,
//string redisCacheScoresIndexKey,
//int focusId,
//int pageSize = 10,
//int pageIndex = 1,
//bool descending = true)
//{
// // 计算score范围
// long minScore = (long)focusId << 32;
// long maxScore = ((long)focusId + 1) << 32;
// // 分页参数计算
// int start = (pageIndex - 1) * pageSize;
// // 获取排序后的member列表
// var members = descending
// ? await Instance.ZRevRangeByScoreAsync(
// redisCacheScoresIndexKey,
// maxScore,
// minScore,
// start,
// pageSize)
// : await Instance.ZRangeByScoreAsync(
// redisCacheScoresIndexKey,
// minScore,
// maxScore,
// start,
// pageSize);
// // 批量获取实际数据
// var dataTasks = members.Select(m =>
// Instance.HGetAsync<T>(redisCacheKey, m)).ToArray();
// await Task.WhenAll(dataTasks);
// // 总数统计优化
// var total = await Instance.ZCountAsync(
// redisCacheScoresIndexKey,
// minScore,
// maxScore);
// return new BusPagedResult<T>
// {
// Items = dataTasks.Select(t => t.Result).ToList(),
// TotalCount = total,
// PageIndex = pageIndex,
// PageSize = pageSize
// };
//}
//public async Task<BusPagedResult<T>> GetFocusPagedData<T>(
//string redisCacheKey,
//string redisCacheScoresIndexKey,
//int focusId,
//int pageSize = 10,
//long? lastScore = null,
//string lastMember = null,
//bool descending = true) where T : DeviceCacheBasicModel
//{
// // 计算分数范围
// long minScore = (long)focusId << 32;
// long maxScore = ((long)focusId + 1) << 32;
// // 获取成员列表
// var members = await GetSortedMembers(
// redisCacheScoresIndexKey,
// minScore,
// maxScore,
// pageSize,
// lastScore,
// lastMember,
// descending);
// // 批量获取数据
// var dataDict = await Instance.HMGetAsync<T>(redisCacheKey, members.CurrentItems);
// return new BusPagedResult<T>
// {
// Items = dataDict,
// TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore),
// HasNext = members.HasNext,
// NextScore = members.NextScore,
// NextMember = members.NextMember
// };
//}
//private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)>
// GetSortedMembers(
// string zsetKey,
// long minScore,
// long maxScore,
// int pageSize,
// long? lastScore,
// string lastMember,
// bool descending)
//{
// var querySize = pageSize + 1;
// var (startScore, exclude) = descending
// ? (lastScore ?? maxScore, lastMember)
// : (lastScore ?? minScore, lastMember);
// var members = descending
// ? await Instance.ZRevRangeByScoreAsync(
// zsetKey,
// max: startScore,
// min: minScore,
// offset: 0,
// count: querySize)
// : await Instance.ZRangeByScoreAsync(
// zsetKey,
// min: startScore,
// max: maxScore,
// offset: 0,
// count: querySize);
// var hasNext = members.Length > pageSize;
// var currentItems = members.Take(pageSize).ToArray();
// var nextCursor = currentItems.Any()
// ? await GetNextCursor(zsetKey, currentItems.Last(), descending)
// : (null, null);
// return (currentItems, hasNext, nextCursor.score, nextCursor.member);
//}
//private async Task<long> GetTotalCount(string zsetKey, long min, long max)
//{
// // 缓存计数优化
// var cacheKey = $"{zsetKey}_count_{min}_{max}";
// var cached = await Instance.GetAsync<long?>(cacheKey);
// if (cached.HasValue)
// return cached.Value;
// var count = await Instance.ZCountAsync(zsetKey, min, max);
// await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
// return count;
//}
//public async Task<Dictionary<int, BusPagedResult<T>>> BatchGetMeterPagedData<T>(
//string redisCacheKey,
//string redisCacheScoresIndexKey,
//IEnumerable<int> focusIds,
//int pageSizePerFocus = 10) where T : DeviceCacheBasicModel
//{
// var results = new ConcurrentDictionary<int, BusPagedResult<T>>();
// var parallelOptions = new ParallelOptions
// {
// MaxDegreeOfParallelism = Environment.ProcessorCount * 2
// };
// await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) =>
// {
// var data = await SingleGetMeterPagedData<T>(
// redisCacheKey,
// redisCacheScoresIndexKey,
// focusId,
// pageSizePerFocus);
// results.TryAdd(focusId, data);
// });
// return new Dictionary<int, BusPagedResult<T>>(results);
//}
2025-04-15 15:49:51 +08:00
2025-04-15 23:20:46 +08:00
2025-03-17 08:35:19 +08:00
}
}