402 lines
14 KiB
C#
Raw Normal View History

2025-03-17 08:35:19 +08:00
using FreeRedis;
2025-04-15 16:05:07 +08:00
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
2025-04-15 16:48:35 +08:00
using JiShe.CollectBus.Common.Extensions;
2025-04-15 16:05:07 +08:00
using JiShe.CollectBus.FreeRedisProvider.Options;
2025-03-17 08:35:19 +08:00
using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Text.Json;
using Volo.Abp.DependencyInjection;
2025-04-15 16:48:35 +08:00
using static System.Runtime.InteropServices.JavaScript.JSType;
2025-03-17 08:35:19 +08:00
namespace JiShe.CollectBus.FreeRedisProvider
{
public class FreeRedisProvider : IFreeRedisProvider, ISingletonDependency
{
private readonly FreeRedisOptions _option;
/// <summary>
/// FreeRedis
/// </summary>
public FreeRedisProvider(IOptions<FreeRedisOptions> options)
{
_option = options.Value;
GetInstance();
}
2025-04-15 15:49:51 +08:00
public RedisClient Instance { get; set; } = new(string.Empty);
2025-03-17 08:35:19 +08:00
/// <summary>
/// 获取 FreeRedis 客户端
/// </summary>
/// <returns></returns>
public IRedisClient GetInstance()
{
2025-04-15 15:49:51 +08:00
2025-03-17 08:35:19 +08:00
var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB}";
Instance = new RedisClient(connectionString);
Instance.Serialize = obj => BusJsonSerializer.Serialize(obj);
Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type);
2025-03-17 08:35:19 +08:00
Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
return Instance;
}
2025-04-15 15:49:51 +08:00
2025-04-15 16:48:35 +08:00
/// <summary>
/// 单个添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
/// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
/// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
/// <param name="data">表计信息</param>
/// <param name="timestamp">可选时间戳</param>
/// <returns></returns>
public async Task AddMeterCacheData<T>(
2025-04-15 15:49:51 +08:00
string redisCacheKey,
2025-04-15 16:48:35 +08:00
string redisCacheFocusIndexKey,
string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
2025-04-15 15:49:51 +08:00
T data,
2025-04-15 16:48:35 +08:00
DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
2025-04-15 15:49:51 +08:00
{
// 参数校验增强
if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
2025-04-15 16:48:35 +08:00
|| string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
2025-04-15 15:49:51 +08:00
{
2025-04-15 16:48:35 +08:00
throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
2025-04-15 15:49:51 +08:00
}
2025-04-15 16:48:35 +08:00
2025-04-15 15:49:51 +08:00
// 计算组合score分类ID + 时间戳)
var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
2025-04-15 16:48:35 +08:00
long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
2025-04-15 15:49:51 +08:00
//全局索引写入
long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// 使用事务保证原子性
using (var trans = Instance.Multi())
{
// 主数据存储Hash
2025-04-15 16:48:35 +08:00
trans.HSet(redisCacheKey, data.UniqueId, data.Serialize());
// 分类索引
trans.SAdd(redisCacheFocusIndexKey, data.UniqueId);
2025-04-15 15:49:51 +08:00
// 排序索引使用ZSET
2025-04-15 16:48:35 +08:00
trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.UniqueId);
2025-04-15 15:49:51 +08:00
//全局索引
2025-04-15 16:48:35 +08:00
trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.UniqueId);
2025-04-15 15:49:51 +08:00
var results = trans.Exec();
if (results == null || results.Length <= 0)
2025-04-15 16:48:35 +08:00
throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
2025-04-15 15:49:51 +08:00
}
await Task.CompletedTask;
}
2025-04-15 16:48:35 +08:00
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
/// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
/// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
/// <param name="items">数据集合</param>
/// <param name="timestamp">可选时间戳</param>
/// <returns></returns>
2025-04-15 15:49:51 +08:00
public async Task BatchAddMeterData<T>(
string redisCacheKey,
2025-04-15 16:48:35 +08:00
string redisCacheFocusIndexKey,
string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
IEnumerable<T> items,
DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
2025-04-15 15:49:51 +08:00
{
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
2025-04-15 16:48:35 +08:00
foreach (var batch in items.Batch(BATCH_SIZE))
{
await semaphore.WaitAsync();
_ = Task.Run(() =>
{
using (var pipe = Instance.StartPipe())
{
foreach (var item in batch)
{
// 计算组合score分类ID + 时间戳)
var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
//全局索引写入
long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// 主数据存储Hash
pipe.HSet(redisCacheKey, item.UniqueId, item.Serialize());
// 分类索引
pipe.SAdd(redisCacheFocusIndexKey, item.UniqueId);
// 排序索引使用ZSET
pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.UniqueId);
//全局索引
pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.UniqueId);
}
pipe.EndPipe();
}
semaphore.Release();
});
}
2025-04-15 15:49:51 +08:00
await Task.CompletedTask;
}
public async Task UpdateMeterData<T>(
string redisCacheKey,
string oldCategoryIndexKey,
string newCategoryIndexKey,
string memberId, // 唯一标识(格式:"分类ID:GUID"
T newData,
int? newCategoryId = null,
DateTimeOffset? newTimestamp = null)
{
// 参数校验
if (string.IsNullOrWhiteSpace(memberId))
throw new ArgumentException("Invalid member ID");
var luaScript = @"
local mainKey = KEYS[1]
local scoreKey = KEYS[2]
local oldIndex = KEYS[3]
local newIndex = KEYS[4]
local member = ARGV[1]
local newData = ARGV[2]
local newScore = ARGV[3]
--
if redis.call('HEXISTS', mainKey, member) == 0 then
return 0
end
--
redis.call('HSET', mainKey, member, newData)
--
if newScore ~= '' then
--
redis.call('SREM', oldIndex, member)
--
redis.call('ZADD', scoreKey, newScore, member)
--
redis.call('SADD', newIndex, member)
end
return 1
";
// 计算新score当分类或时间变化时
long? newScoreValue = null;
if (newCategoryId.HasValue || newTimestamp.HasValue)
{
var parts = memberId.Split(':');
var oldCategoryId = int.Parse(parts[0]);
var actualCategoryId = newCategoryId ?? oldCategoryId;
var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
newScoreValue = ((long)actualCategoryId << 32) | (uint)actualTimestamp.Ticks;
}
var result = await Instance.EvalAsync(luaScript,
new[]
{
redisCacheKey,
$"{redisCacheKey}_scores",
oldCategoryIndexKey,
newCategoryIndexKey
},
new[]
{
memberId,
newData.Serialize(),
newScoreValue?.ToString() ?? ""
});
// 如果时间戳变化则更新全局索引
if (newTimestamp.HasValue)
{
long newGlobalScore = newTimestamp.Value.ToUnixTimeMilliseconds();
await Instance.ZAddAsync("global_data_all", newGlobalScore, memberId);
}
if ((int)result == 0)
throw new KeyNotFoundException("指定数据不存在");
}
public async Task<BusPagedResult<T>> GetMeterZSetPagedData<T>(
string redisCacheKey,
string redisCacheIndexKey,
int categoryId,
int pageSize = 10,
int pageIndex = 1,
bool descending = true)
{
// 计算score范围
long minScore = (long)categoryId << 32;
long maxScore = ((long)categoryId + 1) << 32;
// 分页参数计算
int start = (pageIndex - 1) * pageSize;
// 获取排序后的member列表
var members = descending
? await Instance.ZRevRangeByScoreAsync(
$"{redisCacheKey}_scores",
maxScore,
minScore,
start,
pageSize)
: await Instance.ZRangeByScoreAsync(
$"{redisCacheKey}_scores",
minScore,
maxScore,
start,
pageSize);
// 批量获取实际数据
var dataTasks = members.Select(m =>
Instance.HGetAsync<T>(redisCacheKey, m)).ToArray();
await Task.WhenAll(dataTasks);
// 总数统计优化
var total = await Instance.ZCountAsync(
$"{redisCacheKey}_scores",
minScore,
maxScore);
return new BusPagedResult<T>
{
Items = dataTasks.Select(t => t.Result).ToList(),
TotalCount = total,
PageIndex = pageIndex,
PageSize = pageSize
};
}
public async Task RemoveMeterZSetData<T>(
string redisCacheKey,
string redisCacheIndexKey,
string uniqueId) // 改为基于唯一标识删除
{
// 原子操作
var luaScript = @"
local mainKey = KEYS[1]
local scoreKey = KEYS[2]
local indexKey = KEYS[3]
local member = ARGV[1]
redis.call('HDEL', mainKey, member)
redis.call('ZREM', scoreKey, member)
redis.call('SREM', indexKey, member)
return 1
";
var keys = new[]
{
redisCacheKey,
$"{redisCacheKey}_scores",
redisCacheIndexKey
};
var result = await Instance.EvalAsync(luaScript,
keys,
new[] { uniqueId });
if ((int)result != 1)
throw new Exception("删除操作失败");
}
2025-04-15 16:48:35 +08:00
public async Task<BusCacheGlobalPagedResult<T>> GetGlobalPagedData<T>(
2025-04-15 15:49:51 +08:00
string redisCacheKey,
int pageSize = 10,
long? lastScore = null,
string lastMember = null,
bool descending = true)
{
const string zsetKey = "global_data_all";
// 分页参数处理
var (startScore, excludeMember) = descending
? (lastScore ?? long.MaxValue, lastMember)
: (lastScore ?? 0, lastMember);
// 获取成员列表
string[] members;
if (descending)
{
members = await Instance.ZRevRangeByScoreAsync(
zsetKey,
max: startScore,
min: 0,
offset: 0,
count: pageSize + 1);
}
else
{
members = await Instance.ZRangeByScoreAsync(
zsetKey,
min: startScore,
max: long.MaxValue,
offset: 0,
count: pageSize + 1);
}
// 处理分页结果
bool hasNext = members.Length > pageSize;
var actualMembers = members.Take(pageSize).ToArray();
// 批量获取数据(优化版本)
var dataTasks = actualMembers
.Select(m => Instance.HGetAsync<T>(redisCacheKey, m))
.ToArray();
await Task.WhenAll(dataTasks);
// 获取下一页游标
(long? nextScore, string nextMember) = actualMembers.Any()
? await GetNextCursor(zsetKey, actualMembers.Last(), descending)
: (null, null);
2025-04-15 16:48:35 +08:00
return new BusCacheGlobalPagedResult<T>
2025-04-15 15:49:51 +08:00
{
Items = dataTasks.Select(t => t.Result).ToList(),
HasNext = hasNext,
NextScore = nextScore,
NextMember = nextMember
};
}
private async Task<(long? score, string member)> GetNextCursor(
string zsetKey,
string lastMember,
bool descending)
{
var score = await Instance.ZScoreAsync(zsetKey, lastMember);
return (score.HasValue ? (long)score.Value : null, lastMember);
}
2025-03-17 08:35:19 +08:00
}
}