百万级的任务数据集中器所在分组单独保存

This commit is contained in:
ChenYi 2025-04-17 11:29:26 +08:00
parent 4737c5b53d
commit 13e986168e
9 changed files with 558 additions and 112 deletions

View File

@ -113,7 +113,7 @@ namespace JiShe.CollectBus.Application.Contracts
/// <summary> /// <summary>
/// 通过ZSET索引获取数据 /// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param> /// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
@ -133,6 +133,17 @@ namespace JiShe.CollectBus.Application.Contracts
where T : DeviceCacheBasicModel; where T : DeviceCacheBasicModel;
/// <summary>
/// 优化后的分页获取方法(支持百万级数据)
/// </summary>
Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true) where T : DeviceCacheBasicModel;
///// <summary> ///// <summary>
///// 游标分页查询 ///// 游标分页查询
///// </summary> ///// </summary>
@ -149,28 +160,28 @@ namespace JiShe.CollectBus.Application.Contracts
// string excludeMember, // string excludeMember,
// bool descending); // bool descending);
///// <summary> ///// <summary>
///// 批量获取指定分页的数据 ///// 批量获取指定分页的数据
///// </summary> ///// </summary>
///// <typeparam name="T"></typeparam> ///// <typeparam name="T"></typeparam>
///// <param name="redisHashCacheKey">Hash表缓存key</param> ///// <param name="redisHashCacheKey">Hash表缓存key</param>
///// <param name="members">Hash表字段集合</param> ///// <param name="members">Hash表字段集合</param>
///// <returns></returns> ///// <returns></returns>
//Task<Dictionary<string, T>> BatchGetData<T>( //Task<Dictionary<string, T>> BatchGetData<T>(
// string redisHashCacheKey, // string redisHashCacheKey,
// IEnumerable<string> members) // IEnumerable<string> members)
// where T : DeviceCacheBasicModel; // where T : DeviceCacheBasicModel;
///// <summary> ///// <summary>
///// 获取下一页游标 ///// 获取下一页游标
///// </summary> ///// </summary>
///// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param> ///// <param name="redisZSetScoresIndexCacheKey">排序索引ZSET缓存Key</param>
///// <param name="lastMember">最后一个唯一标识</param> ///// <param name="lastMember">最后一个唯一标识</param>
///// <param name="descending">排序方式</param> ///// <param name="descending">排序方式</param>
///// <returns></returns> ///// <returns></returns>
//Task<decimal?> GetNextScore( //Task<decimal?> GetNextScore(
// string redisZSetScoresIndexCacheKey, // string redisZSetScoresIndexCacheKey,
// string lastMember, // string lastMember,
// bool descending); // bool descending);
} }
} }

View File

@ -70,13 +70,13 @@ namespace JiShe.CollectBus.RedisDataCache
using (var trans = Instance.Multi()) using (var trans = Instance.Multi())
{ {
// 主数据存储Hash // 主数据存储Hash
trans.HSet(redisHashCacheKey, data.MemberID, data.Serialize()); trans.HSet(redisHashCacheKey, data.MemberId, data.Serialize());
// 集中器号分组索引Set缓存 // 集中器号分组索引Set缓存
trans.SAdd(redisSetIndexCacheKey, data.MemberID); trans.SAdd(redisSetIndexCacheKey, data.MemberId);
// 集中器与表计信息排序索引ZSET缓存Key // 集中器与表计信息排序索引ZSET缓存Key
trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberID); trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberId);
var results = trans.Exec(); var results = trans.Exec();
@ -128,13 +128,13 @@ namespace JiShe.CollectBus.RedisDataCache
foreach (var item in batch) foreach (var item in batch)
{ {
// 主数据存储Hash // 主数据存储Hash
pipe.HSet(redisHashCacheKey, item.MemberID, item.Serialize()); pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize());
// Set索引缓存 // Set索引缓存
pipe.SAdd(redisSetIndexCacheKey, item.MemberID); pipe.SAdd(redisSetIndexCacheKey, item.MemberId);
// ZSET索引缓存Key // ZSET索引缓存Key
pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberID); pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId);
} }
pipe.EndPipe(); pipe.EndPipe();
} }
@ -192,11 +192,11 @@ namespace JiShe.CollectBus.RedisDataCache
redisZSetScoresIndexCacheKey redisZSetScoresIndexCacheKey
}; };
var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID }); var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberId });
if ((int)result == 0) if ((int)result == 0)
{ {
_logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberID}数据失败,-102"); _logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberId}数据失败,-102");
} }
} }
@ -248,13 +248,13 @@ namespace JiShe.CollectBus.RedisDataCache
}, },
new object[] new object[]
{ {
newData.MemberID, newData.MemberId,
newData.Serialize() newData.Serialize()
}); });
if ((int)result == 0) if ((int)result == 0)
{ {
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102"); _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102");
} }
} }
@ -328,7 +328,7 @@ namespace JiShe.CollectBus.RedisDataCache
}, },
new object[] new object[]
{ {
newData.MemberID, newData.MemberId,
oldMemberId, oldMemberId,
newData.Serialize(), newData.Serialize(),
newData.ScoreValue.ToString() ?? "", newData.ScoreValue.ToString() ?? "",
@ -336,7 +336,7 @@ namespace JiShe.CollectBus.RedisDataCache
if ((int)result == 0) if ((int)result == 0)
{ {
_logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102"); _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102");
} }
} }
@ -364,10 +364,245 @@ namespace JiShe.CollectBus.RedisDataCache
where T : DeviceCacheBasicModel where T : DeviceCacheBasicModel
{ {
throw new Exception(); throw new Exception();
} }
/// <summary> /// <summary>
/// 通过ZSET索引获取数据 /// 优化后的分页获取方法(支持百万级数据)
/// </summary>
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true) where T : DeviceCacheBasicModel
{
// 参数校验
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized));
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
}
pageSize = Math.Clamp(pageSize, 1, 10000);
const string luaScript = @"
local command = ARGV[1]
local range_start = ARGV[2]
local range_end = ARGV[3]
local limit = tonumber(ARGV[4])
local last_score = ARGV[5]
local last_member = ARGV[6]
-- 3
local members
if command == 'ZRANGEBYSCORE' then
members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
else
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
end
--
local filtered = {}
local count = 0
local start_index = 1
--
if last_score ~= '' and last_member ~= '' then
for i=1,#members,2 do
local score = members[i+1]
local member = members[i]
if command == 'ZRANGEBYSCORE' then
if tonumber(score) > tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member > last_member then
start_index = i
break
end
end
else
if tonumber(score) < tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member < last_member then
start_index = i
break
end
end
end
end
end
--
for i=start_index,#members,2 do
if count >= limit then break end
table.insert(filtered, members[i])
table.insert(filtered, members[i+1])
count = count + 1
end
--
local result_members = {}
local result_scores = {}
for i=1,#filtered,2 do
table.insert(result_members, filtered[i])
table.insert(result_scores, filtered[i+1])
end
if #result_members == 0 then return {0,{},{},{}} end
-- Hash数据
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {#result_members, result_members, result_scores, hash_data}";
// 构造查询范围(包含等于)
string rangeStart, rangeEnd;
if (descending)
{
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
rangeEnd = "-inf";
}
else
{
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
rangeEnd = "+inf";
}
try
{
var scriptResult = (object[])await Instance.EvalAsync(
luaScript,
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
new object[]
{
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
rangeStart,
rangeEnd,
pageSize,
lastScore?.ToString() ?? "",
lastMember ?? ""
});
var itemCount = (long)scriptResult[0];
if (itemCount == 0)
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
// 处理结果集
var members = ((object[])scriptResult[1]).Cast<string>().ToList();
var scores = ((object[])scriptResult[2]).Cast<string>()
.Select(decimal.Parse).ToList();
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
var validItems = members.AsParallel()
.Select((m, i) =>
{
try { return BusJsonSerializer.Deserialize<T>(hashData[i]); }
catch { return null; }
})
.Where(x => x != null)
.ToList();
// 精确分页控制
var hasNext = validItems.Count >= pageSize;
var actualItems = validItems.Take(pageSize).ToList();
// 计算下一页锚点(必须基于原始排序)
decimal? nextScore = null;
string nextMember = null;
if (hasNext && actualItems.Count > 0)
{
var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1);
nextScore = scores[lastValidIndex];
nextMember = members[lastValidIndex];
}
return new BusCacheGlobalPagedResult<T>
{
Items = actualItems,
HasNext = hasNext,
NextScore = nextScore,
NextMember = nextMember,
TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
PageSize = pageSize
};
}
catch (Exception ex)
{
_logger.LogError(ex, "分页查询异常");
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
}
}
/// <summary>
/// 并行分页导出方法(百万级数据支持)
/// </summary>
public async Task<List<T>> FullExportParallel<T>(
string hashKey,
string zsetKey,
int parallelDegree = 10,
int pageSize = 5000) where T : DeviceCacheBasicModel
{
var result = new ConcurrentBag<T>();
var totalCount = await GetTotalCount(zsetKey);
var totalPages = (int)Math.Ceiling(totalCount / (double)pageSize);
var semaphore = new SemaphoreSlim(parallelDegree);
var tasks = new List<Task>();
decimal? lastScore = null;
string lastMember = null;
var isDescending = true;
for (int page = 0; page < totalPages; page++)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
var pageResult = await GetAllPagedData<T>(
hashKey,
zsetKey,
pageSize,
lastScore,
lastMember,
isDescending);
foreach (var item in pageResult.Items)
{
result.Add(item);
}
// 更新分页锚点
if (pageResult.HasNext)
{
lastScore = pageResult.NextScore;
lastMember = pageResult.NextMember;
}
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
return result.ToList();
}
/// <summary>
/// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
/// </summary> /// </summary>
/// <typeparam name="T"></typeparam> /// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param> /// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>

View File

@ -191,40 +191,41 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
} }
/// <summary> /// <summary>
/// 测试单个测点数据项 /// 测试Redis批量读取10万条数据性能
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
[HttpGet] [HttpGet]
public async Task TestRedisCacheGetAllPagedData() public async Task TestRedisCacheGetAllPagedData()
{ {
var timeDensity = "15"; var timeDensity = "15";
string SystemType = ""; string SystemType = "Energy";
string ServerTagName = "JiSheCollectBus2"; string ServerTagName = "JiSheCollectBus2";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var timer = Stopwatch.StartNew();
var timer1 = Stopwatch.StartNew();
decimal? cursor = null; decimal? cursor = null;
string member = null; string member = null;
bool hasNext; bool hasNext;
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
do do
{ {
var page = await _redisDataCacheService var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
.GetAllPagedData<AmmeterInfo>( redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp); pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items); meterInfos.AddRange(page.Items);
cursor = page.NextScore; cursor = page.HasNext ? page.NextScore : null;
member = page.NextMember; member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext; hasNext = page.HasNext;
} while (hasNext); } while (hasNext);
timer.Stop(); timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
_logger.LogInformation($"{nameof(TestRedisCacheGetAllPagedData)} 获取电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
} }

View File

@ -15,6 +15,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.RedisDataCache;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;
using Mapster; using Mapster;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -23,6 +24,7 @@ using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.ScheduledMeterReading namespace JiShe.CollectBus.ScheduledMeterReading
{ {
@ -121,32 +123,45 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue; continue;
} }
//获取缓存中的表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meteryType, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
return;
}
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>(); var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
// 解析结果(结果为嵌套数组) var timer = Stopwatch.StartNew();
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
//获取对应频率中的所有电表信息
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
decimal? cursor = null;
string member = null;
bool hasNext;
do
{
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
timer.Stop();
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return; return;
} }
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
var timer = Stopwatch.StartNew();
//处理数据 //处理数据
//await DeviceGroupBalanceControl.ProcessGenericListAsync( //await DeviceGroupBalanceControl.ProcessGenericListAsync(
@ -163,14 +178,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos, items: meterInfos,
deviceIdSelector: data => data.FocusAddress, deviceIdSelector: data => data.FocusAddress,
processor: data => processor: (data,groupIndex) =>
{ {
_ = AmmerterCreatePublishTask(timeDensity, data); _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
} }
); );
timer.Stop(); timer.Stop();
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
} }
else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
@ -230,37 +245,65 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// meterInfos.Add(tempData); // meterInfos.Add(tempData);
// //focusAddressDataLista.Add(item.FocusAddress); // //focusAddressDataLista.Add(item.FocusAddress);
//} //}
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
var timeDensity = "15"; var timeDensity = "15";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var timer1 = Stopwatch.StartNew();
decimal? cursor = null;
string member = null;
bool hasNext;
List<AmmeterInfo> meterInfos = new List<AmmeterInfo>(); List<AmmeterInfo> meterInfos = new List<AmmeterInfo>();
do List<string> focusAddressDataLista = new List<string>();
var timer1 = Stopwatch.StartNew();
//decimal? cursor = null;
//string member = null;
//bool hasNext;
//do
//{
// var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
// redisCacheMeterInfoHashKeyTemp,
// redisCacheMeterInfoZSetScoresIndexKeyTemp,
// pageSize: 1000,
// lastScore: cursor,
// lastMember: member);
// meterInfos.AddRange(page.Items);
// cursor = page.HasNext ? page.NextScore : null;
// member = page.HasNext ? page.NextMember : null;
// hasNext = page.HasNext;
//} while (hasNext);
var allIds = new HashSet<string>();
decimal? score = null;
string member = null;
while (true)
{ {
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>( var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000, pageSize: 1000,
lastScore: cursor, lastScore: score,
lastMember: member); lastMember: member);
meterInfos.AddRange(page.Items); meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null; focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress));
member = page.HasNext ? page.NextMember : null; foreach (var item in page.Items)
hasNext = page.HasNext; {
} while (hasNext); if (!allIds.Add(item.MemberId))
throw new Exception("Duplicate data found!");
}
if (!page.HasNext) break;
score = page.NextScore;
member = page.NextMember;
}
timer1.Stop(); timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//return;
#else #else
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif #endif
@ -656,9 +699,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary> /// </summary>
/// <param name="timeDensity">采集频率</param> /// <param name="timeDensity">采集频率</param>
/// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param> /// <param name="ammeterInfo">集中器号hash分组的集中器集合数据</param>
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns> /// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity private async Task AmmerterCreatePublishTask(int timeDensity
, AmmeterInfo ammeterInfo) , AmmeterInfo ammeterInfo,int groupIndex,string taskBatch)
{ {
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
@ -666,7 +711,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型 //构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}"; var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{ {
@ -747,7 +794,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
} }
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>(); //Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
List<MeterReadingTelemetryPacketInfo> taskList = new List<MeterReadingTelemetryPacketInfo>();
foreach (var tempItem in tempCodes) foreach (var tempItem in tempCodes)
{ {
@ -802,7 +850,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterReadingRecords = new MeterReadingRecords() var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{ {
ProjectID = ammeterInfo.ProjectID, ProjectID = ammeterInfo.ProjectID,
DatabaseBusiID = ammeterInfo.DatabaseBusiID, DatabaseBusiID = ammeterInfo.DatabaseBusiID,
@ -812,7 +860,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
MeterId = ammeterInfo.MeterId, MeterId = ammeterInfo.MeterId,
MeterType = MeterTypeEnum.Ammeter, MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress, FocusAddress = ammeterInfo.FocusAddress,
FocusID = ammeterInfo.FocusId, FocusId = ammeterInfo.FocusId,
AFN = aFN, AFN = aFN,
Fn = fn, Fn = fn,
ItemCode = tempItem, ItemCode = tempItem,
@ -822,9 +870,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos), IssuedMessageHexString = Convert.ToHexString(dataInfos),
}; };
meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeterInfo.MeterId}_{tempItem}", meterReadingRecords); //meterReadingRecords.CreateDataId(GuidGenerator.Create());
taskList.Add(meterReadingRecords);
} }
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan); //await Task.Delay(timeSpan);
@ -832,14 +881,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//return keyValuePairs; //return keyValuePairs;
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
using (var pipe = FreeRedisProvider.Instance.StartPipe()) //using (var pipe = FreeRedisProvider.Instance.StartPipe())
//{
// pipe.HSet(redisCacheKey, keyValuePairs);
// object[] ret = pipe.EndPipe();
//}
if (taskList == null
|| taskList.Count() <= 0
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
{ {
pipe.HSet(redisCacheKey, keyValuePairs); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101");
object[] ret = pipe.EndPipe(); return;
} }
await _redisDataCacheService.BatchInsertDataAsync(
redisCacheTelemetryPacketInfoHashKey,
await Task.CompletedTask; redisCacheTelemetryPacketInfoSetIndexKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
taskList);
} }
/// <summary> /// <summary>
@ -1088,7 +1148,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos), IssuedMessageHexString = Convert.ToHexString(dataInfos),
}; };
meterReadingRecords.CreateDataId(GuidGenerator.Create()); //meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords); keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
} }

View File

@ -49,23 +49,23 @@ namespace JiShe.CollectBus.Common.Consts
/// <summary> /// <summary>
/// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
/// </summary> /// </summary>
public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}{"{3}"}"; public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}:{"{3}"}";
public const string TelemetryPacket = "TelemetryPacket"; public const string TelemetryPacket = "TelemetryPacket";
/// <summary> /// <summary>
/// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率{4}=>集中器所在分组,{5}=>时间格式的任务批次
/// </summary> /// </summary>
public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}"; public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}:{"{4}"}:{"{5}"}";
/// <summary> /// <summary>
/// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率{4}=>集中器所在分组,{5}=>时间格式的任务批次
/// </summary> /// </summary>
public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}"; public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}:{"{4}"}:{"{5}"}";
/// <summary> /// <summary>
/// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率{4}=>集中器所在分组,{5}=>时间格式的任务批次
/// </summary> /// </summary>
public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}"; public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}:{"{4}"}:{"{5}"}";
///// <summary> ///// <summary>
///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记 ///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记

View File

@ -161,7 +161,6 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
MaxDegreeOfParallelism = maxThreads.Value, MaxDegreeOfParallelism = maxThreads.Value,
}; };
TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
await Task.Run(() => await Task.Run(() =>
{ {
Parallel.For(0, cache.CachedGroups.Length, options, async groupId => Parallel.For(0, cache.CachedGroups.Length, options, async groupId =>
@ -169,8 +168,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
var queue = groupQueues[groupId]; var queue = groupQueues[groupId];
while (queue.TryDequeue(out T item)) while (queue.TryDequeue(out T item))
{ {
await Task.Delay(timeSpan); processor(item, groupId);
processor(item, Thread.CurrentThread.ManagedThreadId);
} }
}); });
}); });
@ -183,14 +181,14 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
/// <typeparam name="T">已经分组的设备信息</typeparam> /// <typeparam name="T">已经分组的设备信息</typeparam>
/// <param name="items">部分或者全部的已经分组的设备集合</param> /// <param name="items">部分或者全部的已经分组的设备集合</param>
/// <param name="deviceIdSelector">从泛型对象提取deviceId</param> /// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
/// <param name="processor">处理委托(参数:当前对象,线程ID</param> /// <param name="processor">处理委托(参数:当前对象,分组ID</param>
/// <param name="maxConcurrency">可选最佳并发度</param> /// <param name="maxConcurrency">可选最佳并发度</param>
/// <returns></returns> /// <returns></returns>
/// <exception cref="InvalidOperationException"></exception> /// <exception cref="InvalidOperationException"></exception>
public static async Task ProcessWithThrottleAsync<T>( public static async Task ProcessWithThrottleAsync<T>(
List<T> items, List<T> items,
Func<T, string> deviceIdSelector, Func<T, string> deviceIdSelector,
Action<T> processor, Action<T,int> processor,
int? maxConcurrency = null) int? maxConcurrency = null)
{ {
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
@ -244,7 +242,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
/// <summary> /// <summary>
/// 分组异步处理(带节流) /// 分组异步处理(带节流)
/// </summary> /// </summary>
private static async Task ProcessItemAsync<T>(T item, Action<T> processor, int groupId) private static async Task ProcessItemAsync<T>(T item, Action<T,int> processor, int groupId)
{ {
// 使用内存缓存降低CPU负载 // 使用内存缓存降低CPU负载
await Task.Yield(); // 立即释放当前线程 await Task.Yield(); // 立即释放当前线程
@ -255,7 +253,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
{ {
ExecutionContext.Run(context!, state => ExecutionContext.Run(context!, state =>
{ {
processor(item); processor(item,groupId);
}, null); }, null);
}); });
} }

View File

@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Common.Models
/// <summary> /// <summary>
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义 /// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary> /// </summary>
public virtual string MemberID => $"{FocusId}:{MeterId}"; public virtual string MemberId => $"{FocusId}:{MeterId}";
/// <summary> /// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳 /// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳

View File

@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Ammeters
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义 /// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary> /// </summary>
[Column(IsIgnore = true)] [Column(IsIgnore = true)]
public override string MemberID => $"{FocusId}:{MeterId}"; public override string MemberId => $"{FocusId}:{MeterId}";
/// <summary> /// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳 /// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳

View File

@ -0,0 +1,141 @@
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Entities.Auditing;
namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
{
/// <summary>
/// 抄读任务Redis缓存数据记录
/// </summary>
public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel
{
/// <summary>
/// 关系映射标识用于ZSet的Member字段和Set的Value字段具体值可以根据不同业务场景进行定义
/// </summary>
public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}";
/// <summary>
/// ZSet排序索引分数值具体值可以根据不同业务场景进行定义例如时间戳
/// </summary>
public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
/// <summary>
/// 是否手动操作
/// </summary>
public bool ManualOrNot { get; set; }
/// <summary>
/// 任务数据唯一标记
/// </summary>
public string TaskMark { get; set; }
/// <summary>
/// 时间戳标记IoTDB时间列处理上报通过构建标记获取唯一标记匹配时间戳。
/// </summary>
public long Timestamps { get; set; }
/// <summary>
/// 是否超时
/// </summary>
public bool IsTimeout { get; set; } = false;
/// <summary>
/// 待抄读时间
/// </summary>
public DateTime PendingCopyReadTime { get; set; }
/// <summary>
/// 集中器地址
/// </summary>
public string FocusAddress { get; set; }
/// <summary>
/// 表地址
/// </summary>
public string MeterAddress { get; set; }
/// <summary>
/// 表类型
/// </summary>
public MeterTypeEnum MeterType { get; set; }
/// <summary>
/// 项目ID
/// </summary>
public int ProjectID { get; set; }
/// <summary>
/// 数据库业务ID
/// </summary>
public int DatabaseBusiID { get; set; }
/// <summary>
/// AFN功能码
/// </summary>
public AFN AFN { get; set; }
/// <summary>
/// 抄读功能码
/// </summary>
public int Fn { get; set; }
/// <summary>
/// 抄读计量点
/// </summary>
public int Pn { get; set; }
/// <summary>
/// 采集项编码
/// </summary>
public string ItemCode { get; set;}
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreationTime { get; set; }
/// <summary>
/// 下发消息内容
/// </summary>
public string IssuedMessageHexString { get; set; }
/// <summary>
/// 下发消息Id
/// </summary>
public string IssuedMessageId { get; set; }
/// <summary>
/// 消息上报内容
/// </summary>
public string? ReceivedMessageHexString { get; set; }
/// <summary>
/// 消息上报时间
/// </summary>
public DateTime? ReceivedTime { get; set; }
/// <summary>
/// 上报消息Id
/// </summary>
public string ReceivedMessageId { get; set; }
/// <summary>
/// 上报报文解析备注,异常情况下才有
/// </summary>
public string ReceivedRemark { get; set; }
//public void CreateDataId(Guid Id)
//{
// this.Id = Id;
//}
}
}