using System.Diagnostics; using FreeRedis; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.FreeRedis.Options; using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.FreeRedis { public class FreeRedisProvider : IFreeRedisProvider, ISingletonDependency { private readonly FreeRedisOptions _option; /// /// FreeRedis /// public FreeRedisProvider(IOptions options) { _option = options.Value; GetInstance(); } public RedisClient Instance { get; set; } = new(string.Empty); /// /// 获取 FreeRedis 客户端 /// /// public IRedisClient GetInstance() { var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB},MaxPoolSize={_option.MaxPoolSize}"; Instance = new RedisClient(connectionString); Instance.Serialize = obj => BusJsonSerializer.Serialize(obj); Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type); Instance.Notice += (s, e) => Trace.WriteLine(e.Log); return Instance; } ///// ///// 单个添加数据 ///// ///// ///// 主数据存储Hash缓存Key ///// 集中器索引Set缓存Key ///// 集中器排序索引ZSET缓存Key ///// 集中器采集频率分组全局索引ZSet缓存Key ///// 表计信息 ///// 可选时间戳 ///// //public async Task AddMeterCacheData( //string redisCacheKey, //string redisCacheFocusIndexKey, //string redisCacheScoresIndexKey, //string redisCacheGlobalIndexKey, //T data, //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel //{ // // 参数校验增强 // if (data == null || string.IsNullOrWhiteSpace(redisCacheKey) // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) // { // throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101"); // } // // 计算组合score(分类ID + 时间戳) // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; // long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks; // //全局索引写入 // long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); // // 使用事务保证原子性 // using (var trans = Instance.Multi()) // { // // 主数据存储Hash // trans.HSet(redisCacheKey, data.MemberID, data.Serialize()); // // 分类索引 // trans.SAdd(redisCacheFocusIndexKey, data.MemberID); // // 排序索引使用ZSET // trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID); // //全局索引 // trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID); // var results = trans.Exec(); // if (results == null || results.Length <= 0) // throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102"); // } // await Task.CompletedTask; //} ///// ///// 批量添加数据 ///// ///// ///// 主数据存储Hash缓存Key ///// 集中器索引Set缓存Key ///// 集中器排序索引ZSET缓存Key ///// 集中器采集频率分组全局索引ZSet缓存Key ///// 数据集合 ///// 可选时间戳 ///// //public async Task BatchAddMeterData( //string redisCacheKey, //string redisCacheFocusIndexKey, //string redisCacheScoresIndexKey, //string redisCacheGlobalIndexKey, //IEnumerable items, //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel //{ // if (items == null // || items.Count() <=0 // || string.IsNullOrWhiteSpace(redisCacheKey) // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) // { // throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101"); // } // const int BATCH_SIZE = 1000; // 每批1000条 // var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); // foreach (var batch in items.Batch(BATCH_SIZE)) // { // await semaphore.WaitAsync(); // _ = Task.Run(() => // { // using (var pipe = Instance.StartPipe()) // { // foreach (var item in batch) // { // // 计算组合score(分类ID + 时间戳) // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; // long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks; // //全局索引写入 // long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); // // 主数据存储Hash // pipe.HSet(redisCacheKey, item.MemberID, item.Serialize()); // // 分类索引Set // pipe.SAdd(redisCacheFocusIndexKey, item.MemberID); // // 排序索引使用ZSET // pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID); // //全局索引 // pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID); // } // pipe.EndPipe(); // } // semaphore.Release(); // }); // } // await Task.CompletedTask; //} ///// ///// 删除指定redis缓存key的缓存数据 ///// ///// ///// 主数据存储Hash缓存Key ///// 集中器索引Set缓存Key ///// 集中器排序索引ZSET缓存Key ///// 集中器采集频率分组全局索引ZSet缓存Key ///// 表计信息 ///// //public async Task RemoveMeterData( //string redisCacheKey, //string redisCacheFocusIndexKey, //string redisCacheScoresIndexKey, //string redisCacheGlobalIndexKey, //T data) where T : DeviceCacheBasicModel //{ // if (data == null // || string.IsNullOrWhiteSpace(redisCacheKey) // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) // { // throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101"); // } // const string luaScript = @" // local mainKey = KEYS[1] // local focusIndexKey = KEYS[2] // local scoresIndexKey = KEYS[3] // local globalIndexKey = KEYS[4] // local member = ARGV[1] // local deleted = 0 // if redis.call('HDEL', mainKey, member) > 0 then // deleted = 1 // end // redis.call('SREM', focusIndexKey, member) // redis.call('ZREM', scoresIndexKey, member) // redis.call('ZREM', globalIndexKey, member) // return deleted // "; // var keys = new[] // { // redisCacheKey, // redisCacheFocusIndexKey, // redisCacheScoresIndexKey, // redisCacheGlobalIndexKey // }; // var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID }); // if ((int)result == 0) // throw new KeyNotFoundException("指定数据不存在"); //} ///// ///// 修改表计缓存信息 ///// ///// ///// 主数据存储Hash缓存Key ///// 旧集中器索引Set缓存Key ///// 新集中器索引Set缓存Key ///// 集中器排序索引ZSET缓存Key ///// 集中器采集频率分组全局索引ZSet缓存Key ///// 表计信息 ///// 可选时间戳 ///// //public async Task UpdateMeterData( //string redisCacheKey, //string oldRedisCacheFocusIndexKey, //string newRedisCacheFocusIndexKey, //string redisCacheScoresIndexKey, //string redisCacheGlobalIndexKey, //T newData, //DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel //{ // if (newData == null // || string.IsNullOrWhiteSpace(redisCacheKey) // || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey) // || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey) // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) // { // throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101"); // } // var luaScript = @" // local mainKey = KEYS[1] // local oldFocusIndexKey = KEYS[2] // local newFocusIndexKey = KEYS[3] // local scoresIndexKey = KEYS[4] // local globalIndexKey = KEYS[5] // local member = ARGV[1] // local newData = ARGV[2] // local newScore = ARGV[3] // local newGlobalScore = ARGV[4] // -- 校验存在性 // if redis.call('HEXISTS', mainKey, member) == 0 then // return 0 // end // -- 更新主数据 // redis.call('HSET', mainKey, member, newData) // -- 处理变更 // if newScore ~= '' then // -- 删除旧索引 // redis.call('SREM', oldFocusIndexKey, member) // redis.call('ZREM', scoresIndexKey, member) // -- 添加新索引 // redis.call('SADD', newFocusIndexKey, member) // redis.call('ZADD', scoresIndexKey, newScore, member) // end // -- 更新全局索引 // if newGlobalScore ~= '' then // -- 删除旧索引 // redis.call('ZREM', globalIndexKey, member) // -- 添加新索引 // redis.call('ZADD', globalIndexKey, newGlobalScore, member) // end // return 1 // "; // var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow; // var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds(); // var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks; // var result = await Instance.EvalAsync(luaScript, // new[] // { // redisCacheKey, // oldRedisCacheFocusIndexKey, // newRedisCacheFocusIndexKey, // redisCacheScoresIndexKey, // redisCacheGlobalIndexKey // }, // new object[] // { // newData.MemberID, // newData.Serialize(), // newScoreValue.ToString() ?? "", // newGlobalScore.ToString() ?? "" // }); // if ((int)result == 0) // { // throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在"); // } //} //public async Task> SingleGetMeterPagedData( //string redisCacheKey, //string redisCacheScoresIndexKey, //int focusId, //int pageSize = 10, //int pageIndex = 1, //bool descending = true) //{ // // 计算score范围 // long minScore = (long)focusId << 32; // long maxScore = ((long)focusId + 1) << 32; // // 分页参数计算 // int start = (pageIndex - 1) * pageSize; // // 获取排序后的member列表 // var members = descending // ? await Instance.ZRevRangeByScoreAsync( // redisCacheScoresIndexKey, // maxScore, // minScore, // start, // pageSize) // : await Instance.ZRangeByScoreAsync( // redisCacheScoresIndexKey, // minScore, // maxScore, // start, // pageSize); // // 批量获取实际数据 // var dataTasks = members.Select(m => // Instance.HGetAsync(redisCacheKey, m)).ToArray(); // await Task.WhenAll(dataTasks); // // 总数统计优化 // var total = await Instance.ZCountAsync( // redisCacheScoresIndexKey, // minScore, // maxScore); // return new BusPagedResult // { // Items = dataTasks.Select(t => t.Result).ToList(), // TotalCount = total, // PageIndex = pageIndex, // PageSize = pageSize // }; //} //public async Task> GetFocusPagedData( //string redisCacheKey, //string redisCacheScoresIndexKey, //int focusId, //int pageSize = 10, //long? lastScore = null, //string lastMember = null, //bool descending = true) where T : DeviceCacheBasicModel //{ // // 计算分数范围 // long minScore = (long)focusId << 32; // long maxScore = ((long)focusId + 1) << 32; // // 获取成员列表 // var members = await GetSortedMembers( // redisCacheScoresIndexKey, // minScore, // maxScore, // pageSize, // lastScore, // lastMember, // descending); // // 批量获取数据 // var dataDict = await Instance.HMGetAsync(redisCacheKey, members.CurrentItems); // return new BusPagedResult // { // Items = dataDict, // TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore), // HasNext = members.HasNext, // NextScore = members.NextScore, // NextMember = members.NextMember // }; //} //private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)> // GetSortedMembers( // string zsetKey, // long minScore, // long maxScore, // int pageSize, // long? lastScore, // string lastMember, // bool descending) //{ // var querySize = pageSize + 1; // var (startScore, exclude) = descending // ? (lastScore ?? maxScore, lastMember) // : (lastScore ?? minScore, lastMember); // var members = descending // ? await Instance.ZRevRangeByScoreAsync( // zsetKey, // max: startScore, // min: minScore, // offset: 0, // count: querySize) // : await Instance.ZRangeByScoreAsync( // zsetKey, // min: startScore, // max: maxScore, // offset: 0, // count: querySize); // var hasNext = members.Length > pageSize; // var currentItems = members.Take(pageSize).ToArray(); // var nextCursor = currentItems.Any() // ? await GetNextCursor(zsetKey, currentItems.Last(), descending) // : (null, null); // return (currentItems, hasNext, nextCursor.score, nextCursor.member); //} //private async Task GetTotalCount(string zsetKey, long min, long max) //{ // // 缓存计数优化 // var cacheKey = $"{zsetKey}_count_{min}_{max}"; // var cached = await Instance.GetAsync(cacheKey); // if (cached.HasValue) // return cached.Value; // var count = await Instance.ZCountAsync(zsetKey, min, max); // await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒 // return count; //} //public async Task>> BatchGetMeterPagedData( //string redisCacheKey, //string redisCacheScoresIndexKey, //IEnumerable focusIds, //int pageSizePerFocus = 10) where T : DeviceCacheBasicModel //{ // var results = new ConcurrentDictionary>(); // var parallelOptions = new ParallelOptions // { // MaxDegreeOfParallelism = Environment.ProcessorCount * 2 // }; // await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) => // { // var data = await SingleGetMeterPagedData( // redisCacheKey, // redisCacheScoresIndexKey, // focusId, // pageSizePerFocus); // results.TryAdd(focusId, data); // }); // return new Dictionary>(results); //} } }