Compare commits
No commits in common. "13e986168e5dd00e04270ba6b94062bbad744cfa" and "78127f7cea9975aa131ba007cf6b3d46ef6b9494" have entirely different histories.
13e986168e
...
78127f7cea
@ -113,7 +113,7 @@ namespace JiShe.CollectBus.Application.Contracts
|
|||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
|
/// 通过ZSET索引获取数据
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="T"></typeparam>
|
/// <typeparam name="T"></typeparam>
|
||||||
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
||||||
@ -133,17 +133,6 @@ namespace JiShe.CollectBus.Application.Contracts
|
|||||||
where T : DeviceCacheBasicModel;
|
where T : DeviceCacheBasicModel;
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 优化后的分页获取方法(支持百万级数据)
|
|
||||||
/// </summary>
|
|
||||||
Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
|
|
||||||
string redisHashCacheKey,
|
|
||||||
string redisZSetScoresIndexCacheKey,
|
|
||||||
int pageSize = 1000,
|
|
||||||
decimal? lastScore = null,
|
|
||||||
string lastMember = null,
|
|
||||||
bool descending = true) where T : DeviceCacheBasicModel;
|
|
||||||
|
|
||||||
///// <summary>
|
///// <summary>
|
||||||
///// 游标分页查询
|
///// 游标分页查询
|
||||||
///// </summary>
|
///// </summary>
|
||||||
@ -160,28 +149,28 @@ namespace JiShe.CollectBus.Application.Contracts
|
|||||||
// string excludeMember,
|
// string excludeMember,
|
||||||
// bool descending);
|
// bool descending);
|
||||||
|
|
||||||
///// <summary>
|
///// <summary>
|
||||||
///// 批量获取指定分页的数据
|
///// 批量获取指定分页的数据
|
||||||
///// </summary>
|
///// </summary>
|
||||||
///// <typeparam name="T"></typeparam>
|
///// <typeparam name="T"></typeparam>
|
||||||
///// <param name="redisHashCacheKey">Hash表缓存key</param>
|
///// <param name="redisHashCacheKey">Hash表缓存key</param>
|
||||||
///// <param name="members">Hash表字段集合</param>
|
///// <param name="members">Hash表字段集合</param>
|
||||||
///// <returns></returns>
|
///// <returns></returns>
|
||||||
//Task<Dictionary<string, T>> BatchGetData<T>(
|
//Task<Dictionary<string, T>> BatchGetData<T>(
|
||||||
// string redisHashCacheKey,
|
// string redisHashCacheKey,
|
||||||
// IEnumerable<string> members)
|
// IEnumerable<string> members)
|
||||||
// where T : DeviceCacheBasicModel;
|
// where T : DeviceCacheBasicModel;
|
||||||
|
|
||||||
///// <summary>
|
///// <summary>
|
||||||
///// 获取下一页游标
|
///// 获取下一页游标
|
||||||
///// </summary>
|
///// </summary>
|
||||||
///// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
|
///// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
|
||||||
///// <param name="lastMember">最后一个唯一标识</param>
|
///// <param name="lastMember">最后一个唯一标识</param>
|
||||||
///// <param name="descending">排序方式</param>
|
///// <param name="descending">排序方式</param>
|
||||||
///// <returns></returns>
|
///// <returns></returns>
|
||||||
//Task<decimal?> GetNextScore(
|
//Task<decimal?> GetNextScore(
|
||||||
// string redisZSetScoresIndexCacheKey,
|
// string redisZSetScoresIndexCacheKey,
|
||||||
// string lastMember,
|
// string lastMember,
|
||||||
// bool descending);
|
// bool descending);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,7 +7,6 @@ using JiShe.CollectBus.Common.Models;
|
|||||||
using JiShe.CollectBus.FreeRedisProvider;
|
using JiShe.CollectBus.FreeRedisProvider;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
@ -70,13 +69,13 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
using (var trans = Instance.Multi())
|
using (var trans = Instance.Multi())
|
||||||
{
|
{
|
||||||
// 主数据存储Hash
|
// 主数据存储Hash
|
||||||
trans.HSet(redisHashCacheKey, data.MemberId, data.Serialize());
|
trans.HSet(redisHashCacheKey, data.MemberID, data.Serialize());
|
||||||
|
|
||||||
// 集中器号分组索引Set缓存
|
// 集中器号分组索引Set缓存
|
||||||
trans.SAdd(redisSetIndexCacheKey, data.MemberId);
|
trans.SAdd(redisSetIndexCacheKey, data.MemberID);
|
||||||
|
|
||||||
// 集中器与表计信息排序索引ZSET缓存Key
|
// 集中器与表计信息排序索引ZSET缓存Key
|
||||||
trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberId);
|
trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberID);
|
||||||
|
|
||||||
var results = trans.Exec();
|
var results = trans.Exec();
|
||||||
|
|
||||||
@ -128,13 +127,13 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
foreach (var item in batch)
|
foreach (var item in batch)
|
||||||
{
|
{
|
||||||
// 主数据存储Hash
|
// 主数据存储Hash
|
||||||
pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize());
|
pipe.HSet(redisHashCacheKey, item.MemberID, item.Serialize());
|
||||||
|
|
||||||
// Set索引缓存
|
// Set索引缓存
|
||||||
pipe.SAdd(redisSetIndexCacheKey, item.MemberId);
|
pipe.SAdd(redisSetIndexCacheKey, item.MemberID);
|
||||||
|
|
||||||
// ZSET索引缓存Key
|
// ZSET索引缓存Key
|
||||||
pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId);
|
pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberID);
|
||||||
}
|
}
|
||||||
pipe.EndPipe();
|
pipe.EndPipe();
|
||||||
}
|
}
|
||||||
@ -192,11 +191,11 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
redisZSetScoresIndexCacheKey
|
redisZSetScoresIndexCacheKey
|
||||||
};
|
};
|
||||||
|
|
||||||
var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberId });
|
var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
|
||||||
|
|
||||||
if ((int)result == 0)
|
if ((int)result == 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberId}数据失败,-102");
|
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberID}数据失败,-102");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -248,13 +247,13 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
},
|
},
|
||||||
new object[]
|
new object[]
|
||||||
{
|
{
|
||||||
newData.MemberId,
|
newData.MemberID,
|
||||||
newData.Serialize()
|
newData.Serialize()
|
||||||
});
|
});
|
||||||
|
|
||||||
if ((int)result == 0)
|
if ((int)result == 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102");
|
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -328,7 +327,7 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
},
|
},
|
||||||
new object[]
|
new object[]
|
||||||
{
|
{
|
||||||
newData.MemberId,
|
newData.MemberID,
|
||||||
oldMemberId,
|
oldMemberId,
|
||||||
newData.Serialize(),
|
newData.Serialize(),
|
||||||
newData.ScoreValue.ToString() ?? "",
|
newData.ScoreValue.ToString() ?? "",
|
||||||
@ -336,7 +335,7 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
|
|
||||||
if ((int)result == 0)
|
if ((int)result == 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102");
|
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -366,243 +365,8 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
throw new Exception();
|
throw new Exception();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 优化后的分页获取方法(支持百万级数据)
|
/// 通过ZSET索引获取数据
|
||||||
/// </summary>
|
|
||||||
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
|
|
||||||
string redisHashCacheKey,
|
|
||||||
string redisZSetScoresIndexCacheKey,
|
|
||||||
int pageSize = 1000,
|
|
||||||
decimal? lastScore = null,
|
|
||||||
string lastMember = null,
|
|
||||||
bool descending = true) where T : DeviceCacheBasicModel
|
|
||||||
{
|
|
||||||
// 参数校验
|
|
||||||
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
|
|
||||||
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
|
||||||
{
|
|
||||||
_logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized));
|
|
||||||
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
||||||
}
|
|
||||||
|
|
||||||
pageSize = Math.Clamp(pageSize, 1, 10000);
|
|
||||||
|
|
||||||
const string luaScript = @"
|
|
||||||
local command = ARGV[1]
|
|
||||||
local range_start = ARGV[2]
|
|
||||||
local range_end = ARGV[3]
|
|
||||||
local limit = tonumber(ARGV[4])
|
|
||||||
local last_score = ARGV[5]
|
|
||||||
local last_member = ARGV[6]
|
|
||||||
|
|
||||||
-- 获取扩展数据(3倍分页大小)
|
|
||||||
local members
|
|
||||||
if command == 'ZRANGEBYSCORE' then
|
|
||||||
members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end,
|
|
||||||
'WITHSCORES', 'LIMIT', 0, limit * 5)
|
|
||||||
else
|
|
||||||
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end,
|
|
||||||
'WITHSCORES', 'LIMIT', 0, limit * 5)
|
|
||||||
end
|
|
||||||
|
|
||||||
-- 精确分页过滤
|
|
||||||
local filtered = {}
|
|
||||||
local count = 0
|
|
||||||
local start_index = 1
|
|
||||||
|
|
||||||
-- 存在锚点时寻找起始位置
|
|
||||||
if last_score ~= '' and last_member ~= '' then
|
|
||||||
for i=1,#members,2 do
|
|
||||||
local score = members[i+1]
|
|
||||||
local member = members[i]
|
|
||||||
|
|
||||||
if command == 'ZRANGEBYSCORE' then
|
|
||||||
if tonumber(score) > tonumber(last_score) then
|
|
||||||
start_index = i
|
|
||||||
break
|
|
||||||
elseif tonumber(score) == tonumber(last_score) then
|
|
||||||
if member > last_member then
|
|
||||||
start_index = i
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
else
|
|
||||||
if tonumber(score) < tonumber(last_score) then
|
|
||||||
start_index = i
|
|
||||||
break
|
|
||||||
elseif tonumber(score) == tonumber(last_score) then
|
|
||||||
if member < last_member then
|
|
||||||
start_index = i
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
-- 收集有效数据
|
|
||||||
for i=start_index,#members,2 do
|
|
||||||
if count >= limit then break end
|
|
||||||
table.insert(filtered, members[i])
|
|
||||||
table.insert(filtered, members[i+1])
|
|
||||||
count = count + 1
|
|
||||||
end
|
|
||||||
|
|
||||||
-- 提取有效数据
|
|
||||||
local result_members = {}
|
|
||||||
local result_scores = {}
|
|
||||||
for i=1,#filtered,2 do
|
|
||||||
table.insert(result_members, filtered[i])
|
|
||||||
table.insert(result_scores, filtered[i+1])
|
|
||||||
end
|
|
||||||
|
|
||||||
if #result_members == 0 then return {0,{},{},{}} end
|
|
||||||
|
|
||||||
-- 获取Hash数据
|
|
||||||
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
|
|
||||||
return {#result_members, result_members, result_scores, hash_data}";
|
|
||||||
|
|
||||||
// 构造查询范围(包含等于)
|
|
||||||
string rangeStart, rangeEnd;
|
|
||||||
if (descending)
|
|
||||||
{
|
|
||||||
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
|
|
||||||
rangeEnd = "-inf";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
|
|
||||||
rangeEnd = "+inf";
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var scriptResult = (object[])await Instance.EvalAsync(
|
|
||||||
luaScript,
|
|
||||||
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
|
|
||||||
new object[]
|
|
||||||
{
|
|
||||||
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
|
|
||||||
rangeStart,
|
|
||||||
rangeEnd,
|
|
||||||
pageSize,
|
|
||||||
lastScore?.ToString() ?? "",
|
|
||||||
lastMember ?? ""
|
|
||||||
});
|
|
||||||
|
|
||||||
var itemCount = (long)scriptResult[0];
|
|
||||||
if (itemCount == 0)
|
|
||||||
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
||||||
|
|
||||||
// 处理结果集
|
|
||||||
var members = ((object[])scriptResult[1]).Cast<string>().ToList();
|
|
||||||
var scores = ((object[])scriptResult[2]).Cast<string>()
|
|
||||||
.Select(decimal.Parse).ToList();
|
|
||||||
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
|
|
||||||
|
|
||||||
var validItems = members.AsParallel()
|
|
||||||
.Select((m, i) =>
|
|
||||||
{
|
|
||||||
try { return BusJsonSerializer.Deserialize<T>(hashData[i]); }
|
|
||||||
catch { return null; }
|
|
||||||
})
|
|
||||||
.Where(x => x != null)
|
|
||||||
.ToList();
|
|
||||||
|
|
||||||
// 精确分页控制
|
|
||||||
var hasNext = validItems.Count >= pageSize;
|
|
||||||
var actualItems = validItems.Take(pageSize).ToList();
|
|
||||||
|
|
||||||
// 计算下一页锚点(必须基于原始排序)
|
|
||||||
decimal? nextScore = null;
|
|
||||||
string nextMember = null;
|
|
||||||
if (hasNext && actualItems.Count > 0)
|
|
||||||
{
|
|
||||||
var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1);
|
|
||||||
nextScore = scores[lastValidIndex];
|
|
||||||
nextMember = members[lastValidIndex];
|
|
||||||
}
|
|
||||||
|
|
||||||
return new BusCacheGlobalPagedResult<T>
|
|
||||||
{
|
|
||||||
Items = actualItems,
|
|
||||||
HasNext = hasNext,
|
|
||||||
NextScore = nextScore,
|
|
||||||
NextMember = nextMember,
|
|
||||||
TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
|
|
||||||
PageSize = pageSize
|
|
||||||
};
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "分页查询异常");
|
|
||||||
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 并行分页导出方法(百万级数据支持)
|
|
||||||
/// </summary>
|
|
||||||
public async Task<List<T>> FullExportParallel<T>(
|
|
||||||
string hashKey,
|
|
||||||
string zsetKey,
|
|
||||||
int parallelDegree = 10,
|
|
||||||
int pageSize = 5000) where T : DeviceCacheBasicModel
|
|
||||||
{
|
|
||||||
var result = new ConcurrentBag<T>();
|
|
||||||
var totalCount = await GetTotalCount(zsetKey);
|
|
||||||
var totalPages = (int)Math.Ceiling(totalCount / (double)pageSize);
|
|
||||||
|
|
||||||
var semaphore = new SemaphoreSlim(parallelDegree);
|
|
||||||
var tasks = new List<Task>();
|
|
||||||
|
|
||||||
decimal? lastScore = null;
|
|
||||||
string lastMember = null;
|
|
||||||
var isDescending = true;
|
|
||||||
|
|
||||||
for (int page = 0; page < totalPages; page++)
|
|
||||||
{
|
|
||||||
await semaphore.WaitAsync();
|
|
||||||
|
|
||||||
tasks.Add(Task.Run(async () =>
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var pageResult = await GetAllPagedData<T>(
|
|
||||||
hashKey,
|
|
||||||
zsetKey,
|
|
||||||
pageSize,
|
|
||||||
lastScore,
|
|
||||||
lastMember,
|
|
||||||
isDescending);
|
|
||||||
|
|
||||||
foreach (var item in pageResult.Items)
|
|
||||||
{
|
|
||||||
result.Add(item);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新分页锚点
|
|
||||||
if (pageResult.HasNext)
|
|
||||||
{
|
|
||||||
lastScore = pageResult.NextScore;
|
|
||||||
lastMember = pageResult.NextMember;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
semaphore.Release();
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
await Task.WhenAll(tasks);
|
|
||||||
return result.ToList();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="T"></typeparam>
|
/// <typeparam name="T"></typeparam>
|
||||||
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
|
||||||
@ -621,81 +385,79 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
bool descending = true)
|
bool descending = true)
|
||||||
where T : DeviceCacheBasicModel
|
where T : DeviceCacheBasicModel
|
||||||
{
|
{
|
||||||
// 参数校验增强
|
// 参数校验
|
||||||
if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
|
||||||
|
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
|
_logger.LogError("参数异常: HashKey或ZSetKey为空");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pageSize < 1 || pageSize > 10000)
|
if (pageSize < 1 || pageSize > 10000)
|
||||||
{
|
throw new ArgumentException("分页大小应在1-10000之间");
|
||||||
_logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
var luaScript = @"
|
var luaScript = @"
|
||||||
local command = ARGV[1]
|
local command = ARGV[1]
|
||||||
local range_start = ARGV[2]
|
local range_start = ARGV[2]
|
||||||
local range_end = ARGV[3]
|
local range_end = ARGV[3]
|
||||||
local limit = tonumber(ARGV[4])
|
local limit = tonumber(ARGV[4])
|
||||||
local last_score = ARGV[5]
|
local last_score = ARGV[5]
|
||||||
local last_member = ARGV[6]
|
local last_member = ARGV[6]
|
||||||
|
|
||||||
-- 获取原始数据
|
-- 获取原始数据
|
||||||
local members
|
local members
|
||||||
if command == 'ZRANGEBYSCORE' then
|
if command == 'ZRANGEBYSCORE' then
|
||||||
members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
||||||
else
|
else
|
||||||
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2)
|
||||||
end
|
end
|
||||||
|
|
||||||
-- 过滤数据
|
-- 过滤数据
|
||||||
local filtered_members = {}
|
local filtered_members = {}
|
||||||
local count = 0
|
local count = 0
|
||||||
for i = 1, #members, 2 do
|
for i = 1, #members, 2 do
|
||||||
local member = members[i]
|
local member = members[i]
|
||||||
local score = members[i+1]
|
local score = members[i+1]
|
||||||
local include = true
|
local include = true
|
||||||
if last_score ~= '' and last_member ~= '' then
|
if last_score ~= '' and last_member ~= '' then
|
||||||
if command == 'ZRANGEBYSCORE' then
|
if command == 'ZRANGEBYSCORE' then
|
||||||
-- 升序:score > last_score 或 (score == last_score 且 member > last_member)
|
-- 升序:score > last_score 或 (score == last_score 且 member > last_member)
|
||||||
if score == last_score then
|
if score == last_score then
|
||||||
include = member > last_member
|
include = member > last_member
|
||||||
else
|
else
|
||||||
include = tonumber(score) > tonumber(last_score)
|
include = tonumber(score) > tonumber(last_score)
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
-- 降序:score < last_score 或 (score == last_score 且 member < last_member)
|
-- 降序:score < last_score 或 (score == last_score 且 member < last_member)
|
||||||
if score == last_score then
|
if score == last_score then
|
||||||
include = member < last_member
|
include = member < last_member
|
||||||
else
|
else
|
||||||
include = tonumber(score) < tonumber(last_score)
|
include = tonumber(score) < tonumber(last_score)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
if include then
|
if include then
|
||||||
table.insert(filtered_members, member)
|
table.insert(filtered_members, member)
|
||||||
table.insert(filtered_members, score)
|
table.insert(filtered_members, score)
|
||||||
count = count + 1
|
count = count + 1
|
||||||
if count >= limit then
|
if count >= limit then
|
||||||
break
|
break
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
-- 提取有效数据
|
-- 提取有效数据
|
||||||
local result_members, result_scores = {}, {}
|
local result_members, result_scores = {}, {}
|
||||||
for i=1,#filtered_members,2 do
|
for i=1,#filtered_members,2 do
|
||||||
table.insert(result_members, filtered_members[i])
|
table.insert(result_members, filtered_members[i])
|
||||||
table.insert(result_scores, filtered_members[i+1])
|
table.insert(result_scores, filtered_members[i+1])
|
||||||
end
|
end
|
||||||
|
|
||||||
if #result_members == 0 then return {0,{},{},{}} end
|
if #result_members == 0 then return {0,{},{},{}} end
|
||||||
|
|
||||||
-- 获取Hash数据
|
-- 获取Hash数据
|
||||||
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
|
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
|
||||||
return {#result_members, result_members, result_scores, hash_data}";
|
return {#result_members, result_members, result_scores, hash_data}";
|
||||||
|
|
||||||
// 调整范围构造逻辑(移除排他符号)
|
// 调整范围构造逻辑(移除排他符号)
|
||||||
string rangeStart, rangeEnd;
|
string rangeStart, rangeEnd;
|
||||||
@ -730,19 +492,25 @@ namespace JiShe.CollectBus.RedisDataCache
|
|||||||
var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList();
|
var scores = ((object[])scriptResult[2]).Cast<string>().Select(decimal.Parse).ToList();
|
||||||
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
|
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
|
||||||
|
|
||||||
var validItems = members.AsParallel()
|
var validItems = members.Select((m, i) =>
|
||||||
.Select((m, i) =>
|
|
||||||
{
|
{
|
||||||
try { return BusJsonSerializer.Deserialize<T>(hashData[i]); }
|
try
|
||||||
catch { return null; }
|
{
|
||||||
})
|
return !string.IsNullOrEmpty(hashData[i])
|
||||||
.Where(x => x != null)
|
? BusJsonSerializer.Deserialize<T>(hashData[i])
|
||||||
.ToList();
|
: null;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError($"反序列化失败: {m} - {ex.Message}");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).Where(x => x != null).ToList();
|
||||||
|
|
||||||
var hasNext = validItems.Count > pageSize;
|
var hasNext = validItems.Count > pageSize;
|
||||||
var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
|
var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
|
||||||
|
|
||||||
//分页锚点索引
|
// 修正分页锚点索引
|
||||||
decimal? nextScore = null;
|
decimal? nextScore = null;
|
||||||
string nextMember = null;
|
string nextMember = null;
|
||||||
if (hasNext && actualItems.Any())
|
if (hasNext && actualItems.Any())
|
||||||
|
|||||||
@ -191,41 +191,40 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 测试Redis批量读取10万条数据性能
|
/// 测试单个测点数据项
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[HttpGet]
|
[HttpGet]
|
||||||
public async Task TestRedisCacheGetAllPagedData()
|
public async Task TestRedisCacheGetAllPagedData()
|
||||||
{
|
{
|
||||||
var timeDensity = "15";
|
var timeDensity = "15";
|
||||||
string SystemType = "Energy";
|
string SystemType = "";
|
||||||
string ServerTagName = "JiSheCollectBus2";
|
string ServerTagName = "JiSheCollectBus2";
|
||||||
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
|
var timer = Stopwatch.StartNew();
|
||||||
|
|
||||||
var timer1 = Stopwatch.StartNew();
|
|
||||||
decimal? cursor = null;
|
decimal? cursor = null;
|
||||||
string member = null;
|
string member = null;
|
||||||
bool hasNext;
|
bool hasNext;
|
||||||
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
|
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
|
var page = await _redisDataCacheService
|
||||||
redisCacheMeterInfoHashKeyTemp,
|
.GetAllPagedData<AmmeterInfo>(
|
||||||
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
redisCacheMeterInfoHashKeyTemp,
|
||||||
pageSize: 1000,
|
redisCacheMeterInfoZSetScoresIndexKeyTemp);
|
||||||
lastScore: cursor,
|
|
||||||
lastMember: member);
|
|
||||||
|
|
||||||
meterInfos.AddRange(page.Items);
|
meterInfos.AddRange(page.Items);
|
||||||
cursor = page.HasNext ? page.NextScore : null;
|
cursor = page.NextScore;
|
||||||
member = page.HasNext ? page.NextMember : null;
|
member = page.NextMember;
|
||||||
hasNext = page.HasNext;
|
hasNext = page.HasNext;
|
||||||
} while (hasNext);
|
} while (hasNext);
|
||||||
|
|
||||||
timer1.Stop();
|
timer.Stop();
|
||||||
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
|
||||||
|
_logger.LogInformation($"{nameof(TestRedisCacheGetAllPagedData)} 获取电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -15,7 +15,6 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
|||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using JiShe.CollectBus.Kafka.Producer;
|
using JiShe.CollectBus.Kafka.Producer;
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.RedisDataCache;
|
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
using Mapster;
|
using Mapster;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
@ -24,7 +23,6 @@ using System.Collections.Generic;
|
|||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using static FreeSql.Internal.GlobalFilter;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||||
{
|
{
|
||||||
@ -124,44 +122,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
//获取缓存中的表信息
|
||||||
|
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meteryType, timeDensity)}*";
|
||||||
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
|
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
|
||||||
|
|
||||||
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
if (meteryType == MeterTypeEnum.Ammeter.ToString())
|
||||||
{
|
{
|
||||||
var timer = Stopwatch.StartNew();
|
// 解析结果(结果为嵌套数组)
|
||||||
|
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
|
||||||
//获取对应频率中的所有电表信息
|
|
||||||
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
|
||||||
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
|
||||||
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
|
|
||||||
|
|
||||||
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
|
|
||||||
decimal? cursor = null;
|
|
||||||
string member = null;
|
|
||||||
bool hasNext;
|
|
||||||
do
|
|
||||||
{
|
|
||||||
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
|
|
||||||
redisCacheMeterInfoHashKeyTemp,
|
|
||||||
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
|
||||||
pageSize: 1000,
|
|
||||||
lastScore: cursor,
|
|
||||||
lastMember: member);
|
|
||||||
|
|
||||||
meterInfos.AddRange(page.Items);
|
|
||||||
cursor = page.HasNext ? page.NextScore : null;
|
|
||||||
member = page.HasNext ? page.NextMember : null;
|
|
||||||
hasNext = page.HasNext;
|
|
||||||
} while (hasNext);
|
|
||||||
|
|
||||||
if (meterInfos == null || meterInfos.Count <= 0)
|
if (meterInfos == null || meterInfos.Count <= 0)
|
||||||
{
|
{
|
||||||
timer.Stop();
|
|
||||||
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
|
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
|
||||||
|
|
||||||
|
var timer = Stopwatch.StartNew();
|
||||||
|
|
||||||
//处理数据
|
//处理数据
|
||||||
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
|
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
|
||||||
@ -178,14 +163,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
|
||||||
items: meterInfos,
|
items: meterInfos,
|
||||||
deviceIdSelector: data => data.FocusAddress,
|
deviceIdSelector: data => data.FocusAddress,
|
||||||
processor: (data,groupIndex) =>
|
processor: data =>
|
||||||
{
|
{
|
||||||
_ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
|
_ = AmmerterCreatePublishTask(timeDensity, data);
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
timer.Stop();
|
timer.Stop();
|
||||||
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
|
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}");
|
||||||
|
|
||||||
}
|
}
|
||||||
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
|
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
|
||||||
@ -246,64 +231,36 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
// //focusAddressDataLista.Add(item.FocusAddress);
|
// //focusAddressDataLista.Add(item.FocusAddress);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
|
||||||
|
|
||||||
var timeDensity = "15";
|
var timeDensity = "15";
|
||||||
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
|
||||||
|
|
||||||
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
|
|
||||||
List<string> focusAddressDataLista = new List<string>();
|
|
||||||
var timer1 = Stopwatch.StartNew();
|
var timer1 = Stopwatch.StartNew();
|
||||||
//decimal? cursor = null;
|
decimal? cursor = null;
|
||||||
//string member = null;
|
|
||||||
//bool hasNext;
|
|
||||||
//do
|
|
||||||
//{
|
|
||||||
// var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
|
|
||||||
// redisCacheMeterInfoHashKeyTemp,
|
|
||||||
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
|
||||||
// pageSize: 1000,
|
|
||||||
// lastScore: cursor,
|
|
||||||
// lastMember: member);
|
|
||||||
|
|
||||||
// meterInfos.AddRange(page.Items);
|
|
||||||
// cursor = page.HasNext ? page.NextScore : null;
|
|
||||||
// member = page.HasNext ? page.NextMember : null;
|
|
||||||
// hasNext = page.HasNext;
|
|
||||||
//} while (hasNext);
|
|
||||||
|
|
||||||
var allIds = new HashSet<string>();
|
|
||||||
decimal? score = null;
|
|
||||||
string member = null;
|
string member = null;
|
||||||
|
bool hasNext;
|
||||||
while (true)
|
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
|
||||||
|
do
|
||||||
{
|
{
|
||||||
var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
|
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
|
||||||
redisCacheMeterInfoHashKeyTemp,
|
redisCacheMeterInfoHashKeyTemp,
|
||||||
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
redisCacheMeterInfoZSetScoresIndexKeyTemp,
|
||||||
pageSize: 1000,
|
pageSize: 1000,
|
||||||
lastScore: score,
|
lastScore: cursor,
|
||||||
lastMember: member);
|
lastMember: member);
|
||||||
|
|
||||||
meterInfos.AddRange(page.Items);
|
meterInfos.AddRange(page.Items);
|
||||||
focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress));
|
cursor = page.HasNext ? page.NextScore : null;
|
||||||
foreach (var item in page.Items)
|
member = page.HasNext ? page.NextMember : null;
|
||||||
{
|
hasNext = page.HasNext;
|
||||||
if (!allIds.Add(item.MemberId))
|
} while (hasNext);
|
||||||
throw new Exception("Duplicate data found!");
|
|
||||||
}
|
|
||||||
if (!page.HasNext) break;
|
|
||||||
score = page.NextScore;
|
|
||||||
member = page.NextMember;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
timer1.Stop();
|
timer1.Stop();
|
||||||
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
|
||||||
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
|
|
||||||
//return;
|
|
||||||
#else
|
#else
|
||||||
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
var meterInfos = await GetAmmeterInfoList(gatherCode);
|
||||||
#endif
|
#endif
|
||||||
@ -699,11 +656,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="timeDensity">采集频率</param>
|
/// <param name="timeDensity">采集频率</param>
|
||||||
/// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param>
|
/// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param>
|
||||||
/// <param name="groupIndex">集中器所在分组</param>
|
|
||||||
/// <param name="taskBatch">时间格式的任务批次名称</param>
|
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
private async Task AmmerterCreatePublishTask(int timeDensity
|
private async Task AmmerterCreatePublishTask(int timeDensity
|
||||||
, AmmeterInfo ammeterInfo,int groupIndex,string taskBatch)
|
, AmmeterInfo ammeterInfo)
|
||||||
{
|
{
|
||||||
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
|
||||||
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
|
||||||
@ -711,9 +666,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
var currentTime = DateTime.Now;
|
var currentTime = DateTime.Now;
|
||||||
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
|
||||||
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
||||||
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}";
|
||||||
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
|
||||||
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
|
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
|
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
|
||||||
{
|
{
|
||||||
@ -794,8 +747,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
|
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
|
||||||
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
|
|
||||||
|
|
||||||
foreach (var tempItem in tempCodes)
|
foreach (var tempItem in tempCodes)
|
||||||
{
|
{
|
||||||
@ -850,7 +802,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
|
var meterReadingRecords = new MeterReadingRecords()
|
||||||
{
|
{
|
||||||
ProjectID = ammeterInfo.ProjectID,
|
ProjectID = ammeterInfo.ProjectID,
|
||||||
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
|
||||||
@ -860,7 +812,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
MeterId = ammeterInfo.MeterId,
|
MeterId = ammeterInfo.MeterId,
|
||||||
MeterType = MeterTypeEnum.Ammeter,
|
MeterType = MeterTypeEnum.Ammeter,
|
||||||
FocusAddress = ammeterInfo.FocusAddress,
|
FocusAddress = ammeterInfo.FocusAddress,
|
||||||
FocusId = ammeterInfo.FocusId,
|
FocusID = ammeterInfo.FocusId,
|
||||||
AFN = aFN,
|
AFN = aFN,
|
||||||
Fn = fn,
|
Fn = fn,
|
||||||
ItemCode = tempItem,
|
ItemCode = tempItem,
|
||||||
@ -870,10 +822,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||||
IssuedMessageHexString = Convert.ToHexString(dataInfos),
|
IssuedMessageHexString = Convert.ToHexString(dataInfos),
|
||||||
};
|
};
|
||||||
|
meterReadingRecords.CreateDataId(GuidGenerator.Create());
|
||||||
|
|
||||||
//meterReadingRecords.CreateDataId(GuidGenerator.Create());
|
keyValuePairs.TryAdd($"{ammeterInfo.MeterId}_{tempItem}", meterReadingRecords);
|
||||||
|
|
||||||
taskList.Add(meterReadingRecords);
|
|
||||||
}
|
}
|
||||||
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
|
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
|
||||||
//await Task.Delay(timeSpan);
|
//await Task.Delay(timeSpan);
|
||||||
@ -881,25 +832,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
//return keyValuePairs;
|
//return keyValuePairs;
|
||||||
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
|
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
|
||||||
|
|
||||||
//using (var pipe = FreeRedisProvider.Instance.StartPipe())
|
using (var pipe = FreeRedisProvider.Instance.StartPipe())
|
||||||
//{
|
|
||||||
// pipe.HSet(redisCacheKey, keyValuePairs);
|
|
||||||
// object[] ret = pipe.EndPipe();
|
|
||||||
//}
|
|
||||||
if (taskList == null
|
|
||||||
|| taskList.Count() <= 0
|
|
||||||
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
|
|
||||||
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|
|
||||||
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
|
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101");
|
pipe.HSet(redisCacheKey, keyValuePairs);
|
||||||
return;
|
object[] ret = pipe.EndPipe();
|
||||||
}
|
}
|
||||||
await _redisDataCacheService.BatchInsertDataAsync(
|
|
||||||
redisCacheTelemetryPacketInfoHashKey,
|
|
||||||
redisCacheTelemetryPacketInfoSetIndexKey,
|
await Task.CompletedTask;
|
||||||
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
|
|
||||||
taskList);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -1148,7 +1088,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
IssuedMessageId = GuidGenerator.Create().ToString(),
|
IssuedMessageId = GuidGenerator.Create().ToString(),
|
||||||
IssuedMessageHexString = Convert.ToHexString(dataInfos),
|
IssuedMessageHexString = Convert.ToHexString(dataInfos),
|
||||||
};
|
};
|
||||||
//meterReadingRecords.CreateDataId(GuidGenerator.Create());
|
meterReadingRecords.CreateDataId(GuidGenerator.Create());
|
||||||
|
|
||||||
keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
|
keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,23 +49,23 @@ namespace JiShe.CollectBus.Common.Consts
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
|
/// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}:{"{3}"}";
|
public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}{"{3}"}";
|
||||||
|
|
||||||
public const string TelemetryPacket = "TelemetryPacket";
|
public const string TelemetryPacket = "TelemetryPacket";
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次
|
/// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}:{"{4}"}:{"{5}"}";
|
public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}";
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次
|
/// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}:{"{4}"}:{"{5}"}";
|
public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}";
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次
|
/// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}:{"{4}"}:{"{5}"}";
|
public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}";
|
||||||
|
|
||||||
///// <summary>
|
///// <summary>
|
||||||
///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
|
///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
|
||||||
|
|||||||
@ -161,6 +161,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
|
|||||||
MaxDegreeOfParallelism = maxThreads.Value,
|
MaxDegreeOfParallelism = maxThreads.Value,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
|
||||||
await Task.Run(() =>
|
await Task.Run(() =>
|
||||||
{
|
{
|
||||||
Parallel.For(0, cache.CachedGroups.Length, options, async groupId =>
|
Parallel.For(0, cache.CachedGroups.Length, options, async groupId =>
|
||||||
@ -168,7 +169,8 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
|
|||||||
var queue = groupQueues[groupId];
|
var queue = groupQueues[groupId];
|
||||||
while (queue.TryDequeue(out T item))
|
while (queue.TryDequeue(out T item))
|
||||||
{
|
{
|
||||||
processor(item, groupId);
|
await Task.Delay(timeSpan);
|
||||||
|
processor(item, Thread.CurrentThread.ManagedThreadId);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -181,14 +183,14 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
|
|||||||
/// <typeparam name="T">已经分组的设备信息</typeparam>
|
/// <typeparam name="T">已经分组的设备信息</typeparam>
|
||||||
/// <param name="items">部分或者全部的已经分组的设备集合</param>
|
/// <param name="items">部分或者全部的已经分组的设备集合</param>
|
||||||
/// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
|
/// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
|
||||||
/// <param name="processor">处理委托(参数:当前对象,分组ID)</param>
|
/// <param name="processor">处理委托(参数:当前对象,线程ID)</param>
|
||||||
/// <param name="maxConcurrency">可选最佳并发度</param>
|
/// <param name="maxConcurrency">可选最佳并发度</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
/// <exception cref="InvalidOperationException"></exception>
|
/// <exception cref="InvalidOperationException"></exception>
|
||||||
public static async Task ProcessWithThrottleAsync<T>(
|
public static async Task ProcessWithThrottleAsync<T>(
|
||||||
List<T> items,
|
List<T> items,
|
||||||
Func<T, string> deviceIdSelector,
|
Func<T, string> deviceIdSelector,
|
||||||
Action<T,int> processor,
|
Action<T> processor,
|
||||||
int? maxConcurrency = null)
|
int? maxConcurrency = null)
|
||||||
{
|
{
|
||||||
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
|
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
|
||||||
@ -242,7 +244,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 分组异步处理(带节流)
|
/// 分组异步处理(带节流)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private static async Task ProcessItemAsync<T>(T item, Action<T,int> processor, int groupId)
|
private static async Task ProcessItemAsync<T>(T item, Action<T> processor, int groupId)
|
||||||
{
|
{
|
||||||
// 使用内存缓存降低CPU负载
|
// 使用内存缓存降低CPU负载
|
||||||
await Task.Yield(); // 立即释放当前线程
|
await Task.Yield(); // 立即释放当前线程
|
||||||
@ -253,7 +255,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
|
|||||||
{
|
{
|
||||||
ExecutionContext.Run(context!, state =>
|
ExecutionContext.Run(context!, state =>
|
||||||
{
|
{
|
||||||
processor(item,groupId);
|
processor(item);
|
||||||
}, null);
|
}, null);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Common.Models
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
|
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public virtual string MemberId => $"{FocusId}:{MeterId}";
|
public virtual string MemberID => $"{FocusId}:{MeterId}";
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
|
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
|
||||||
|
|||||||
@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Ammeters
|
|||||||
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
|
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Column(IsIgnore = true)]
|
[Column(IsIgnore = true)]
|
||||||
public override string MemberId => $"{FocusId}:{MeterId}";
|
public override string MemberID => $"{FocusId}:{MeterId}";
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
|
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
|
||||||
|
|||||||
@ -1,141 +0,0 @@
|
|||||||
using JiShe.CollectBus.Common.Enums;
|
|
||||||
using JiShe.CollectBus.Common.Models;
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using Volo.Abp.Domain.Entities;
|
|
||||||
using Volo.Abp.Domain.Entities.Auditing;
|
|
||||||
|
|
||||||
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 抄读任务Redis缓存数据记录
|
|
||||||
/// </summary>
|
|
||||||
public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
|
|
||||||
/// </summary>
|
|
||||||
public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}";
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
|
|
||||||
/// </summary>
|
|
||||||
public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 是否手动操作
|
|
||||||
/// </summary>
|
|
||||||
public bool ManualOrNot { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 任务数据唯一标记
|
|
||||||
/// </summary>
|
|
||||||
public string TaskMark { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。
|
|
||||||
/// </summary>
|
|
||||||
public long Timestamps { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 是否超时
|
|
||||||
/// </summary>
|
|
||||||
public bool IsTimeout { get; set; } = false;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 待抄读时间
|
|
||||||
/// </summary>
|
|
||||||
public DateTime PendingCopyReadTime { get; set; }
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 集中器地址
|
|
||||||
/// </summary>
|
|
||||||
public string FocusAddress { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 表地址
|
|
||||||
/// </summary>
|
|
||||||
public string MeterAddress { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 表类型
|
|
||||||
/// </summary>
|
|
||||||
public MeterTypeEnum MeterType { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 项目ID
|
|
||||||
/// </summary>
|
|
||||||
public int ProjectID { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 数据库业务ID
|
|
||||||
/// </summary>
|
|
||||||
public int DatabaseBusiID { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// AFN功能码
|
|
||||||
/// </summary>
|
|
||||||
public AFN AFN { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 抄读功能码
|
|
||||||
/// </summary>
|
|
||||||
public int Fn { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 抄读计量点
|
|
||||||
/// </summary>
|
|
||||||
public int Pn { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 采集项编码
|
|
||||||
/// </summary>
|
|
||||||
public string ItemCode { get; set;}
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 创建时间
|
|
||||||
/// </summary>
|
|
||||||
public DateTime CreationTime { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 下发消息内容
|
|
||||||
/// </summary>
|
|
||||||
public string IssuedMessageHexString { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 下发消息Id
|
|
||||||
/// </summary>
|
|
||||||
public string IssuedMessageId { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 消息上报内容
|
|
||||||
/// </summary>
|
|
||||||
public string? ReceivedMessageHexString { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 消息上报时间
|
|
||||||
/// </summary>
|
|
||||||
public DateTime? ReceivedTime { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 上报消息Id
|
|
||||||
/// </summary>
|
|
||||||
public string ReceivedMessageId { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 上报报文解析备注,异常情况下才有
|
|
||||||
/// </summary>
|
|
||||||
public string ReceivedRemark { get; set; }
|
|
||||||
|
|
||||||
//public void CreateDataId(Guid Id)
|
|
||||||
//{
|
|
||||||
// this.Id = Id;
|
|
||||||
//}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user