From 13e986168e5dd00e04270ba6b94062bbad744cfa Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Thu, 17 Apr 2025 11:29:26 +0800
Subject: [PATCH] =?UTF-8?q?=E7=99=BE=E4=B8=87=E7=BA=A7=E7=9A=84=E4=BB=BB?=
=?UTF-8?q?=E5=8A=A1=E6=95=B0=E6=8D=AE=E9=9B=86=E4=B8=AD=E5=99=A8=E6=89=80?=
=?UTF-8?q?=E5=9C=A8=E5=88=86=E7=BB=84=E5=8D=95=E7=8B=AC=E4=BF=9D=E5=AD=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../RedisDataCache/IRedisDataCacheService.cs | 57 ++--
.../RedisDataCache/RedisDataCacheService.cs | 263 +++++++++++++++++-
.../Samples/SampleAppService.cs | 25 +-
.../BasicScheduledMeterReadingService.cs | 154 ++++++----
.../Consts/RedisConst.cs | 14 +-
.../DeviceGroupBalanceControl.cs | 12 +-
.../Models/DeviceCacheBasicModel.cs | 2 +-
.../Ammeters/AmmeterInfo.cs | 2 +-
.../MeterReadingTelemetryPacketInfo.cs | 141 ++++++++++
9 files changed, 558 insertions(+), 112 deletions(-)
create mode 100644 src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
diff --git a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
index c6f3613..5f0e8f8 100644
--- a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
+++ b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
@@ -113,7 +113,7 @@ namespace JiShe.CollectBus.Application.Contracts
///
- /// 通过ZSET索引获取数据
+ /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
///
///
/// 主数据存储Hash缓存Key
@@ -133,6 +133,17 @@ namespace JiShe.CollectBus.Application.Contracts
where T : DeviceCacheBasicModel;
+ ///
+ /// 优化后的分页获取方法(支持百万级数据)
+ ///
+ Task> GetAllPagedDataOptimized(
+ string redisHashCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ int pageSize = 1000,
+ decimal? lastScore = null,
+ string lastMember = null,
+ bool descending = true) where T : DeviceCacheBasicModel;
+
/////
///// 游标分页查询
/////
@@ -149,28 +160,28 @@ namespace JiShe.CollectBus.Application.Contracts
// string excludeMember,
// bool descending);
- /////
- ///// 批量获取指定分页的数据
- /////
- /////
- ///// Hash表缓存key
- ///// Hash表字段集合
- /////
- //Task> BatchGetData(
- // string redisHashCacheKey,
- // IEnumerable members)
- // where T : DeviceCacheBasicModel;
+ /////
+ ///// 批量获取指定分页的数据
+ /////
+ /////
+ ///// Hash表缓存key
+ ///// Hash表字段集合
+ /////
+ //Task> BatchGetData(
+ // string redisHashCacheKey,
+ // IEnumerable members)
+ // where T : DeviceCacheBasicModel;
- /////
- ///// 获取下一页游标
- /////
- ///// 排序索引ZSET缓存Key
- ///// 最后一个唯一标识
- ///// 排序方式
- /////
- //Task GetNextScore(
- // string redisZSetScoresIndexCacheKey,
- // string lastMember,
- // bool descending);
+ /////
+ ///// 获取下一页游标
+ /////
+ ///// 排序索引ZSET缓存Key
+ ///// 最后一个唯一标识
+ ///// 排序方式
+ /////
+ //Task GetNextScore(
+ // string redisZSetScoresIndexCacheKey,
+ // string lastMember,
+ // bool descending);
}
}
diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
index a634789..3c96410 100644
--- a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
+++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
@@ -70,13 +70,13 @@ namespace JiShe.CollectBus.RedisDataCache
using (var trans = Instance.Multi())
{
// 主数据存储Hash
- trans.HSet(redisHashCacheKey, data.MemberID, data.Serialize());
+ trans.HSet(redisHashCacheKey, data.MemberId, data.Serialize());
// 集中器号分组索引Set缓存
- trans.SAdd(redisSetIndexCacheKey, data.MemberID);
+ trans.SAdd(redisSetIndexCacheKey, data.MemberId);
// 集中器与表计信息排序索引ZSET缓存Key
- trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberID);
+ trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberId);
var results = trans.Exec();
@@ -128,13 +128,13 @@ namespace JiShe.CollectBus.RedisDataCache
foreach (var item in batch)
{
// 主数据存储Hash
- pipe.HSet(redisHashCacheKey, item.MemberID, item.Serialize());
+ pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize());
// Set索引缓存
- pipe.SAdd(redisSetIndexCacheKey, item.MemberID);
+ pipe.SAdd(redisSetIndexCacheKey, item.MemberId);
// ZSET索引缓存Key
- pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberID);
+ pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId);
}
pipe.EndPipe();
}
@@ -192,11 +192,11 @@ namespace JiShe.CollectBus.RedisDataCache
redisZSetScoresIndexCacheKey
};
- var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
+ var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberId });
if ((int)result == 0)
{
- _logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberID}数据失败,-102");
+ _logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberId}数据失败,-102");
}
}
@@ -248,13 +248,13 @@ namespace JiShe.CollectBus.RedisDataCache
},
new object[]
{
- newData.MemberID,
+ newData.MemberId,
newData.Serialize()
});
if ((int)result == 0)
{
- _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102");
+ _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102");
}
}
@@ -328,7 +328,7 @@ namespace JiShe.CollectBus.RedisDataCache
},
new object[]
{
- newData.MemberID,
+ newData.MemberId,
oldMemberId,
newData.Serialize(),
newData.ScoreValue.ToString() ?? "",
@@ -336,7 +336,7 @@ namespace JiShe.CollectBus.RedisDataCache
if ((int)result == 0)
{
- _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102");
+ _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102");
}
}
@@ -364,10 +364,245 @@ namespace JiShe.CollectBus.RedisDataCache
where T : DeviceCacheBasicModel
{
throw new Exception();
- }
+ }
+
///
- /// 通过ZSET索引获取数据
+ /// 优化后的分页获取方法(支持百万级数据)
+ ///
+ public async Task> GetAllPagedDataOptimized(
+ string redisHashCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ int pageSize = 1000,
+ decimal? lastScore = null,
+ string lastMember = null,
+ bool descending = true) where T : DeviceCacheBasicModel
+ {
+ // 参数校验
+ if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
+ string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ {
+ _logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized));
+ return new BusCacheGlobalPagedResult { Items = new List() };
+ }
+
+ pageSize = Math.Clamp(pageSize, 1, 10000);
+
+ const string luaScript = @"
+ local command = ARGV[1]
+ local range_start = ARGV[2]
+ local range_end = ARGV[3]
+ local limit = tonumber(ARGV[4])
+ local last_score = ARGV[5]
+ local last_member = ARGV[6]
+
+ -- 获取扩展数据(3倍分页大小)
+ local members
+ if command == 'ZRANGEBYSCORE' then
+ members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end,
+ 'WITHSCORES', 'LIMIT', 0, limit * 5)
+ else
+ members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end,
+ 'WITHSCORES', 'LIMIT', 0, limit * 5)
+ end
+
+ -- 精确分页过滤
+ local filtered = {}
+ local count = 0
+ local start_index = 1
+
+ -- 存在锚点时寻找起始位置
+ if last_score ~= '' and last_member ~= '' then
+ for i=1,#members,2 do
+ local score = members[i+1]
+ local member = members[i]
+
+ if command == 'ZRANGEBYSCORE' then
+ if tonumber(score) > tonumber(last_score) then
+ start_index = i
+ break
+ elseif tonumber(score) == tonumber(last_score) then
+ if member > last_member then
+ start_index = i
+ break
+ end
+ end
+ else
+ if tonumber(score) < tonumber(last_score) then
+ start_index = i
+ break
+ elseif tonumber(score) == tonumber(last_score) then
+ if member < last_member then
+ start_index = i
+ break
+ end
+ end
+ end
+ end
+ end
+
+ -- 收集有效数据
+ for i=start_index,#members,2 do
+ if count >= limit then break end
+ table.insert(filtered, members[i])
+ table.insert(filtered, members[i+1])
+ count = count + 1
+ end
+
+ -- 提取有效数据
+ local result_members = {}
+ local result_scores = {}
+ for i=1,#filtered,2 do
+ table.insert(result_members, filtered[i])
+ table.insert(result_scores, filtered[i+1])
+ end
+
+ if #result_members == 0 then return {0,{},{},{}} end
+
+ -- 获取Hash数据
+ local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
+ return {#result_members, result_members, result_scores, hash_data}";
+
+ // 构造查询范围(包含等于)
+ string rangeStart, rangeEnd;
+ if (descending)
+ {
+ rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
+ rangeEnd = "-inf";
+ }
+ else
+ {
+ rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
+ rangeEnd = "+inf";
+ }
+
+ try
+ {
+ var scriptResult = (object[])await Instance.EvalAsync(
+ luaScript,
+ new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
+ new object[]
+ {
+ descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
+ rangeStart,
+ rangeEnd,
+ pageSize,
+ lastScore?.ToString() ?? "",
+ lastMember ?? ""
+ });
+
+ var itemCount = (long)scriptResult[0];
+ if (itemCount == 0)
+ return new BusCacheGlobalPagedResult { Items = new List() };
+
+ // 处理结果集
+ var members = ((object[])scriptResult[1]).Cast().ToList();
+ var scores = ((object[])scriptResult[2]).Cast()
+ .Select(decimal.Parse).ToList();
+ var hashData = ((object[])scriptResult[3]).Cast().ToList();
+
+ var validItems = members.AsParallel()
+ .Select((m, i) =>
+ {
+ try { return BusJsonSerializer.Deserialize(hashData[i]); }
+ catch { return null; }
+ })
+ .Where(x => x != null)
+ .ToList();
+
+ // 精确分页控制
+ var hasNext = validItems.Count >= pageSize;
+ var actualItems = validItems.Take(pageSize).ToList();
+
+ // 计算下一页锚点(必须基于原始排序)
+ decimal? nextScore = null;
+ string nextMember = null;
+ if (hasNext && actualItems.Count > 0)
+ {
+ var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1);
+ nextScore = scores[lastValidIndex];
+ nextMember = members[lastValidIndex];
+ }
+
+ return new BusCacheGlobalPagedResult
+ {
+ Items = actualItems,
+ HasNext = hasNext,
+ NextScore = nextScore,
+ NextMember = nextMember,
+ TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
+ PageSize = pageSize
+ };
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "分页查询异常");
+ return new BusCacheGlobalPagedResult { Items = new List() };
+ }
+ }
+
+ ///
+ /// 并行分页导出方法(百万级数据支持)
+ ///
+ public async Task> FullExportParallel(
+ string hashKey,
+ string zsetKey,
+ int parallelDegree = 10,
+ int pageSize = 5000) where T : DeviceCacheBasicModel
+ {
+ var result = new ConcurrentBag();
+ var totalCount = await GetTotalCount(zsetKey);
+ var totalPages = (int)Math.Ceiling(totalCount / (double)pageSize);
+
+ var semaphore = new SemaphoreSlim(parallelDegree);
+ var tasks = new List();
+
+ decimal? lastScore = null;
+ string lastMember = null;
+ var isDescending = true;
+
+ for (int page = 0; page < totalPages; page++)
+ {
+ await semaphore.WaitAsync();
+
+ tasks.Add(Task.Run(async () =>
+ {
+ try
+ {
+ var pageResult = await GetAllPagedData(
+ hashKey,
+ zsetKey,
+ pageSize,
+ lastScore,
+ lastMember,
+ isDescending);
+
+ foreach (var item in pageResult.Items)
+ {
+ result.Add(item);
+ }
+
+ // 更新分页锚点
+ if (pageResult.HasNext)
+ {
+ lastScore = pageResult.NextScore;
+ lastMember = pageResult.NextMember;
+ }
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+ }));
+ }
+
+ await Task.WhenAll(tasks);
+ return result.ToList();
+ }
+
+
+ ///
+ /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
///
///
/// 主数据存储Hash缓存Key
diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index 3af4b6a..3658557 100644
--- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -191,40 +191,41 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
}
///
- /// 测试单个测点数据项
+ /// 测试Redis批量读取10万条数据性能
///
///
[HttpGet]
public async Task TestRedisCacheGetAllPagedData()
{
var timeDensity = "15";
- string SystemType = "";
+ string SystemType = "Energy";
string ServerTagName = "JiSheCollectBus2";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
- var timer = Stopwatch.StartNew();
+ var timer1 = Stopwatch.StartNew();
decimal? cursor = null;
string member = null;
bool hasNext;
List meterInfos = new List();
do
{
- var page = await _redisDataCacheService
- .GetAllPagedData(
- redisCacheMeterInfoHashKeyTemp,
- redisCacheMeterInfoZSetScoresIndexKeyTemp);
+ var page = await _redisDataCacheService.GetAllPagedData(
+ redisCacheMeterInfoHashKeyTemp,
+ redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ pageSize: 1000,
+ lastScore: cursor,
+ lastMember: member);
meterInfos.AddRange(page.Items);
- cursor = page.NextScore;
- member = page.NextMember;
+ cursor = page.HasNext ? page.NextScore : null;
+ member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
- timer.Stop();
-
- _logger.LogInformation($"{nameof(TestRedisCacheGetAllPagedData)} 获取电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
+ timer1.Stop();
+ _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index d32cdb2..27de191 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -15,6 +15,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
+using JiShe.CollectBus.RedisDataCache;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Mapster;
using Microsoft.Extensions.Logging;
@@ -23,6 +24,7 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
+using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@@ -121,32 +123,45 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue;
}
-
-
-
- //获取缓存中的表信息
- var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meteryType, timeDensity)}*";
- var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
- {
- _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104");
- return;
- }
-
+
var meterTypes = EnumExtensions.ToEnumDictionary();
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
- // 解析结果(结果为嵌套数组)
- var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
+ var timer = Stopwatch.StartNew();
+
+ //获取对应频率中的所有电表信息
+ var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+
+ List meterInfos = new List();
+ decimal? cursor = null;
+ string member = null;
+ bool hasNext;
+ do
+ {
+ var page = await _redisDataCacheService.GetAllPagedData(
+ redisCacheMeterInfoHashKeyTemp,
+ redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ pageSize: 1000,
+ lastScore: cursor,
+ lastMember: member);
+
+ meterInfos.AddRange(page.Items);
+ cursor = page.HasNext ? page.NextScore : null;
+ member = page.HasNext ? page.NextMember : null;
+ hasNext = page.HasNext;
+ } while (hasNext);
+
if (meterInfos == null || meterInfos.Count <= 0)
{
+ timer.Stop();
_logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
- var timer = Stopwatch.StartNew();
//处理数据
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
@@ -163,14 +178,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
- processor: data =>
+ processor: (data,groupIndex) =>
{
- _ = AmmerterCreatePublishTask(timeDensity, data);
+ _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
}
);
timer.Stop();
- _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}");
+ _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息");
}
else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
@@ -230,37 +245,65 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// meterInfos.Add(tempData);
// //focusAddressDataLista.Add(item.FocusAddress);
//}
-
- //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
-
+
+
+
var timeDensity = "15";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
- var timer1 = Stopwatch.StartNew();
- decimal? cursor = null;
- string member = null;
- bool hasNext;
List meterInfos = new List();
- do
+ List focusAddressDataLista = new List();
+ var timer1 = Stopwatch.StartNew();
+ //decimal? cursor = null;
+ //string member = null;
+ //bool hasNext;
+ //do
+ //{
+ // var page = await _redisDataCacheService.GetAllPagedDataOptimized(
+ // redisCacheMeterInfoHashKeyTemp,
+ // redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ // pageSize: 1000,
+ // lastScore: cursor,
+ // lastMember: member);
+
+ // meterInfos.AddRange(page.Items);
+ // cursor = page.HasNext ? page.NextScore : null;
+ // member = page.HasNext ? page.NextMember : null;
+ // hasNext = page.HasNext;
+ //} while (hasNext);
+
+ var allIds = new HashSet();
+ decimal? score = null;
+ string member = null;
+
+ while (true)
{
- var page = await _redisDataCacheService.GetAllPagedData(
+ var page = await _redisDataCacheService.GetAllPagedDataOptimized(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
- lastScore: cursor,
+ lastScore: score,
lastMember: member);
meterInfos.AddRange(page.Items);
- cursor = page.HasNext ? page.NextScore : null;
- member = page.HasNext ? page.NextMember : null;
- hasNext = page.HasNext;
- } while (hasNext);
+ focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress));
+ foreach (var item in page.Items)
+ {
+ if (!allIds.Add(item.MemberId))
+ throw new Exception("Duplicate data found!");
+ }
+ if (!page.HasNext) break;
+ score = page.NextScore;
+ member = page.NextMember;
+ }
+
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
-
+ //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
+ //return;
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
@@ -656,9 +699,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
/// 采集频率
/// 集中器号hash分组的集中器集合数据
+ /// 集中器所在分组
+ /// 时间格式的任务批次名称
///
private async Task AmmerterCreatePublishTask(int timeDensity
- , AmmeterInfo ammeterInfo)
+ , AmmeterInfo ammeterInfo,int groupIndex,string taskBatch)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
@@ -666,7 +711,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
- var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}";
+ var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+ var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+ var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
@@ -747,7 +794,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
- Dictionary keyValuePairs = new Dictionary();
+ //Dictionary keyValuePairs = new Dictionary();
+ List taskList = new List();
foreach (var tempItem in tempCodes)
{
@@ -802,7 +850,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
- var meterReadingRecords = new MeterReadingRecords()
+ var meterReadingRecords = new MeterReadingTelemetryPacketInfo()
{
ProjectID = ammeterInfo.ProjectID,
DatabaseBusiID = ammeterInfo.DatabaseBusiID,
@@ -812,7 +860,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
MeterId = ammeterInfo.MeterId,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress,
- FocusID = ammeterInfo.FocusId,
+ FocusId = ammeterInfo.FocusId,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
@@ -822,9 +870,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
};
- meterReadingRecords.CreateDataId(GuidGenerator.Create());
- keyValuePairs.TryAdd($"{ammeterInfo.MeterId}_{tempItem}", meterReadingRecords);
+ //meterReadingRecords.CreateDataId(GuidGenerator.Create());
+
+ taskList.Add(meterReadingRecords);
}
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan);
@@ -832,14 +881,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//return keyValuePairs;
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
- using (var pipe = FreeRedisProvider.Instance.StartPipe())
+ //using (var pipe = FreeRedisProvider.Instance.StartPipe())
+ //{
+ // pipe.HSet(redisCacheKey, keyValuePairs);
+ // object[] ret = pipe.EndPipe();
+ //}
+ if (taskList == null
+ || taskList.Count() <= 0
+ || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey)
+ || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
+ || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
{
- pipe.HSet(redisCacheKey, keyValuePairs);
- object[] ret = pipe.EndPipe();
+ _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101");
+ return;
}
-
-
- await Task.CompletedTask;
+ await _redisDataCacheService.BatchInsertDataAsync(
+ redisCacheTelemetryPacketInfoHashKey,
+ redisCacheTelemetryPacketInfoSetIndexKey,
+ redisCacheTelemetryPacketInfoZSetScoresIndexKey,
+ taskList);
}
///
@@ -1088,7 +1148,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
};
- meterReadingRecords.CreateDataId(GuidGenerator.Create());
+ //meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
}
diff --git a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs
index dce5307..7ac170b 100644
--- a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs
+++ b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs
@@ -49,23 +49,23 @@ namespace JiShe.CollectBus.Common.Consts
///
/// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
///
- public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}{"{3}"}";
+ public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}:{"{3}"}";
public const string TelemetryPacket = "TelemetryPacket";
///
- /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次
///
- public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}";
+ public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}:{"{4}"}:{"{5}"}";
///
- /// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ /// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次
///
- public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}";
+ public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}:{"{4}"}:{"{5}"}";
///
- /// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ /// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次
///
- public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}";
+ public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}:{"{4}"}:{"{5}"}";
/////
///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs
index 4be0f4f..06b7d70 100644
--- a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs
+++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs
@@ -161,7 +161,6 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
MaxDegreeOfParallelism = maxThreads.Value,
};
- TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
await Task.Run(() =>
{
Parallel.For(0, cache.CachedGroups.Length, options, async groupId =>
@@ -169,8 +168,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
var queue = groupQueues[groupId];
while (queue.TryDequeue(out T item))
{
- await Task.Delay(timeSpan);
- processor(item, Thread.CurrentThread.ManagedThreadId);
+ processor(item, groupId);
}
});
});
@@ -183,14 +181,14 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
/// 已经分组的设备信息
/// 部分或者全部的已经分组的设备集合
/// 从泛型对象提取deviceId
- /// 处理委托(参数:当前对象,线程ID)
+ /// 处理委托(参数:当前对象,分组ID)
/// 可选最佳并发度
///
///
public static async Task ProcessWithThrottleAsync(
List items,
Func deviceIdSelector,
- Action processor,
+ Action processor,
int? maxConcurrency = null)
{
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
@@ -244,7 +242,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
///
/// 分组异步处理(带节流)
///
- private static async Task ProcessItemAsync(T item, Action processor, int groupId)
+ private static async Task ProcessItemAsync(T item, Action processor, int groupId)
{
// 使用内存缓存降低CPU负载
await Task.Yield(); // 立即释放当前线程
@@ -255,7 +253,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
ExecutionContext.Run(context!, state =>
{
- processor(item);
+ processor(item,groupId);
}, null);
});
}
diff --git a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
index 335c17c..d397151 100644
--- a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
+++ b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
@@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Common.Models
///
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
///
- public virtual string MemberID => $"{FocusId}:{MeterId}";
+ public virtual string MemberId => $"{FocusId}:{MeterId}";
///
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
index 3df01b5..8b082bb 100644
--- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
+++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
@@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Ammeters
/// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
///
[Column(IsIgnore = true)]
- public override string MemberID => $"{FocusId}:{MeterId}";
+ public override string MemberId => $"{FocusId}:{MeterId}";
///
/// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
new file mode 100644
index 0000000..c3f75d3
--- /dev/null
+++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs
@@ -0,0 +1,141 @@
+using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.Models;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Volo.Abp.Domain.Entities;
+using Volo.Abp.Domain.Entities.Auditing;
+
+namespace JiShe.CollectBus.IotSystems.MeterReadingRecords
+{
+ ///
+ /// 抄读任务Redis缓存数据记录
+ ///
+ public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel
+ {
+ ///
+ /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
+ ///
+ public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}";
+
+ ///
+ /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
+ ///
+ public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
+
+
+ ///
+ /// 是否手动操作
+ ///
+ public bool ManualOrNot { get; set; }
+
+ ///
+ /// 任务数据唯一标记
+ ///
+ public string TaskMark { get; set; }
+
+ ///
+ /// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。
+ ///
+ public long Timestamps { get; set; }
+
+ ///
+ /// 是否超时
+ ///
+ public bool IsTimeout { get; set; } = false;
+
+ ///
+ /// 待抄读时间
+ ///
+ public DateTime PendingCopyReadTime { get; set; }
+
+
+ ///
+ /// 集中器地址
+ ///
+ public string FocusAddress { get; set; }
+
+ ///
+ /// 表地址
+ ///
+ public string MeterAddress { get; set; }
+
+ ///
+ /// 表类型
+ ///
+ public MeterTypeEnum MeterType { get; set; }
+
+ ///
+ /// 项目ID
+ ///
+ public int ProjectID { get; set; }
+
+ ///
+ /// 数据库业务ID
+ ///
+ public int DatabaseBusiID { get; set; }
+
+ ///
+ /// AFN功能码
+ ///
+ public AFN AFN { get; set; }
+
+ ///
+ /// 抄读功能码
+ ///
+ public int Fn { get; set; }
+
+ ///
+ /// 抄读计量点
+ ///
+ public int Pn { get; set; }
+
+ ///
+ /// 采集项编码
+ ///
+ public string ItemCode { get; set;}
+
+
+ ///
+ /// 创建时间
+ ///
+ public DateTime CreationTime { get; set; }
+
+ ///
+ /// 下发消息内容
+ ///
+ public string IssuedMessageHexString { get; set; }
+
+ ///
+ /// 下发消息Id
+ ///
+ public string IssuedMessageId { get; set; }
+
+ ///
+ /// 消息上报内容
+ ///
+ public string? ReceivedMessageHexString { get; set; }
+
+ ///
+ /// 消息上报时间
+ ///
+ public DateTime? ReceivedTime { get; set; }
+
+ ///
+ /// 上报消息Id
+ ///
+ public string ReceivedMessageId { get; set; }
+
+ ///
+ /// 上报报文解析备注,异常情况下才有
+ ///
+ public string ReceivedRemark { get; set; }
+
+ //public void CreateDataId(Guid Id)
+ //{
+ // this.Id = Id;
+ //}
+ }
+}