diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
index 4d78706..8f50017 100644
--- a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
+++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
@@ -12,6 +12,9 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
+using static FreeSql.Internal.GlobalFilter;
+using static System.Runtime.InteropServices.JavaScript.JSType;
+using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application;
namespace JiShe.CollectBus.RedisDataCache
{
@@ -118,7 +121,7 @@ namespace JiShe.CollectBus.RedisDataCache
await semaphore.WaitAsync();
_ = Task.Run(() =>
- {
+ {
using (var pipe = Instance.StartPipe())
{
foreach (var item in batch)
@@ -372,129 +375,172 @@ namespace JiShe.CollectBus.RedisDataCache
/// 最后一个索引
/// 最后一个唯一标识
/// 排序方式
- ///
+ ///
public async Task> GetAllPagedData(
- string redisHashCacheKey,
- string redisZSetScoresIndexCacheKey,
- int pageSize = 1000,
- decimal? lastScore = null,
- string lastMember = null,
- bool descending = true)
- where T : DeviceCacheBasicModel
+ string redisHashCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ int pageSize = 1000,
+ decimal? lastScore = null,
+ string lastMember = null,
+ bool descending = true)
+ where T : DeviceCacheBasicModel
{
// 参数校验(保持不变)
- if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
+ string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
- _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
+ _logger.LogError("参数异常: HashKey或ZSetKey为空");
return null;
}
if (pageSize < 1 || pageSize > 10000)
- {
- _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102");
- return null;
- }
+ throw new ArgumentException("分页大小应在1-10000之间");
+ // 更新后的Lua脚本
var luaScript = @"
- local command = ARGV[1]
- local range_start = ARGV[2]
- local range_end = ARGV[3]
- local limit = tonumber(ARGV[4])
- local last_score = ARGV[5]
- local last_member = ARGV[6]
+local command = ARGV[1]
+local range_start = ARGV[2]
+local range_end = ARGV[3]
+local limit = tonumber(ARGV[4])
+local last_score = ARGV[5]
+local last_member = ARGV[6]
- -- 处理相同分数下的字典序分页
- if last_score ~= '' and last_member ~= '' then
+-- 调整range_start,当有last_score且没有last_member时
+if last_score ~= '' and last_member == '' then
+ if command == 'ZRANGEBYSCORE' then
+ range_start = '('..last_score
+ else
+ range_start = '('..last_score
+ end
+end
+
+local members
+if command == 'ZRANGEBYSCORE' then
+ members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit)
+else
+ members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit)
+end
+
+if #members == 0 then return {0,{},{},{}} end
+
+local result_members = {}
+local result_scores = {}
+
+-- 如果有last_member,进行过滤
+if last_member ~= '' and last_score ~= '' then
+ for i = 1, #members, 2 do
+ local member = members[i]
+ local score = members[i+1]
+ local include = true
+
+ if score == last_score then
if command == 'ZRANGEBYSCORE' then
- range_start = '(' .. last_score
- range_end = '(' .. last_score .. ' ' .. last_member
+ -- 升序:member必须 > last_member
+ if member <= last_member then
+ include = false
+ end
else
- range_start = '(' .. last_score .. ' ' .. last_member
- range_end = '(' .. last_score
+ -- 降序:member必须 < last_member
+ if member >= last_member then
+ include = false
+ end
end
end
- -- 执行范围查询
- local members
- if command == 'ZRANGEBYSCORE' then
- members = redis.call(command, KEYS[1], range_start, range_end,
- 'WITHSCORES', 'LIMIT', 0, limit)
- else
- members = redis.call(command, KEYS[1], range_end, range_start,
- 'WITHSCORES', 'LIMIT', 0, limit)
+ if include then
+ table.insert(result_members, member)
+ table.insert(result_scores, score)
end
+ end
+else
+ for i = 1, #members, 2 do
+ table.insert(result_members, members[i])
+ table.insert(result_scores, members[i+1])
+ end
+end
- -- 提取成员和分数
- local result_members = {}
- local result_scores = {}
- for i = 1, #members, 2 do
- table.insert(result_members, members[i])
- table.insert(result_scores, members[i+1])
- end
+-- 截取前limit条
+local count = #result_members
+if count > limit then
+ result_members = {unpack(result_members, 1, limit)}
+ result_scores = {unpack(result_scores, 1, limit)}
+end
- -- 获取Hash数据
- local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
+if #result_members == 0 then
+ return {0, {}, {}, {}}
+end
- return {
- #result_members,
- result_members,
- result_scores,
- hash_data
- }";
+-- 获取Hash数据
+local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
+return {#result_members, result_members, result_scores, hash_data}";
- //正确设置范围参数
+ // 修复点:根据是否传递lastMember决定rangeStart是否排他
string rangeStart, rangeEnd;
if (descending)
{
- rangeStart = lastScore.HasValue ? $"({lastScore}" : "+inf";
- rangeEnd = "-inf"; // 降序时固定为最小值
+ rangeStart = lastScore.HasValue
+ ? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString())
+ : "+inf";
+ rangeEnd = "-inf";
}
else
{
- rangeStart = lastScore.HasValue ? $"({lastScore}" : "-inf";
- rangeEnd = "+inf"; // 升序时固定为最大值
+ rangeStart = lastScore.HasValue
+ ? (string.IsNullOrEmpty(lastMember) ? "(" + lastScore.Value.ToString() : lastScore.Value.ToString())
+ : "-inf";
+ rangeEnd = "+inf";
}
- var result = (object[])await Instance.EvalAsync(
- luaScript,
+ // 执行Lua脚本(保持不变)
+ var scriptResult = (object[])await Instance.EvalAsync(luaScript,
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
new object[]
{
- descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
- rangeStart,
- rangeEnd,
- (pageSize + 1).ToString(), // 多取1条用于判断hasNext
- lastScore?.ToString() ?? "",
- lastMember ?? ""
+ descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
+ rangeStart,
+ rangeEnd,
+ (pageSize + 1).ToString(),
+ lastScore?.ToString() ?? "",
+ lastMember ?? ""
});
- if ((long)result[0] == 0)
+ // 处理空结果(保持不变)
+ if ((long)scriptResult[0] == 0)
return new BusCacheGlobalPagedResult { Items = new List() };
- // 处理结果集
- var members = ((object[])result[1]).Cast().ToList();
- var scores = ((object[])result[2]).Cast().Select(decimal.Parse).ToList();
- var hashData = ((object[])result[3]).Cast().ToList();
+ // 数据提取(保持不变)
+ var members = ((object[])scriptResult[1]).Cast().ToList();
+ var scores = ((object[])scriptResult[2]).Cast().Select(decimal.Parse).ToList();
+ var hashData = ((object[])scriptResult[3]).Cast().ToList();
- //合并有效数据并处理游标
- var validItems = members.Zip(hashData, (m, h) =>
- !string.IsNullOrWhiteSpace(h) ? BusJsonSerializer.Deserialize(h) : null)
- .Where(x => x != null)
- .Take(pageSize + 1)
- .ToList();
+ // 反序列化处理(保持不变)
+ var validItems = members.Select((m, i) =>
+ {
+ try
+ {
+ return !string.IsNullOrEmpty(hashData[i])
+ ? BusJsonSerializer.Deserialize(hashData[i])
+ : null;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError($"反序列化失败: {m} - {ex.Message}");
+ return null;
+ }
+ }).Where(x => x != null).Take(pageSize + 1).ToList();
+ // 分页逻辑(保持不变)
var hasNext = validItems.Count > pageSize;
var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
- // 计算下一页起始点
- string nextMember = null;
+ // 计算下一页锚点(保持不变)
decimal? nextScore = null;
- if (hasNext)
+ string nextMember = null;
+ if (hasNext && actualItems.Any())
{
- // 获取实际返回的最后一条有效数据
- var lastValidIndex = actualItems.Count() - 1;
- nextMember = members[lastValidIndex];
- nextScore = scores[lastValidIndex];
+ var lastIndex = Math.Min(members.Count - 1, pageSize);
+ nextScore = scores[lastIndex];
+ nextMember = members[lastIndex];
}
return new BusCacheGlobalPagedResult
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index ce4635b..4f8ae1f 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -213,50 +213,50 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
#if DEBUG
- //var timeDensity = "15";
- //string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
- ////获取缓存中的电表信息
- //var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
+ var timeDensity = "15";
+ string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
+ //获取缓存中的电表信息
+ var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
- //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- //var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
- ////List focusAddressDataLista = new List();
- //List meterInfos = new List();
- //foreach (var item in tempMeterInfos)
- //{
- // var tempData = item.Adapt();
- // tempData.FocusId = item.FocusID;
- // tempData.MeterId = item.Id;
- // meterInfos.Add(tempData);
- // //focusAddressDataLista.Add(item.FocusAddress);
- //}
+ var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
+ var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
+ //List focusAddressDataLista = new List();
+ List meterInfos = new List();
+ foreach (var item in tempMeterInfos)
+ {
+ var tempData = item.Adapt();
+ tempData.FocusId = item.FocusID;
+ tempData.MeterId = item.Id;
+ meterInfos.Add(tempData);
+ //focusAddressDataLista.Add(item.FocusAddress);
+ }
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
- var timeDensity = "15";
- var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
- var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
+ //var timeDensity = "15";
+ //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
+ //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
+ //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
- decimal? cursor = null;
- string member = null;
- bool hasNext;
- List meterInfos = new List();
- do
- {
- var page = await _redisDataCacheService.GetAllPagedData(
- redisCacheMeterInfoHashKeyTemp,
- redisCacheMeterInfoZSetScoresIndexKeyTemp,
- pageSize: 1000,
- lastScore: cursor,
- lastMember: member);
+ //decimal? cursor = null;
+ //string member = null;
+ //bool hasNext;
+ //List meterInfos = new List();
+ //do
+ //{
+ // var page = await _redisDataCacheService.GetAllPagedData(
+ // redisCacheMeterInfoHashKeyTemp,
+ // redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ // pageSize: 1000,
+ // lastScore: cursor,
+ // lastMember: member);
- meterInfos.AddRange(page.Items);
- cursor = page.HasNext ? page.NextScore : null;
- member = page.HasNext ? page.NextMember : null;
- hasNext = page.HasNext;
- } while (hasNext);
+ // meterInfos.AddRange(page.Items);
+ // cursor = page.HasNext ? page.NextScore : null;
+ // member = page.HasNext ? page.NextMember : null;
+ // hasNext = page.HasNext;
+ //} while (hasNext);
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
diff --git a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs
index 80d2f23..b17e9c2 100644
--- a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs
+++ b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs
@@ -89,5 +89,26 @@ namespace JiShe.CollectBus.Common.Extensions
if (buffer.Count > 0)
yield return buffer;
}
+
+ //public static IEnumerable> Batch(this IEnumerable source, int batchSize)
+ //{
+ // if (batchSize <= 0)
+ // throw new ArgumentOutOfRangeException(nameof(batchSize));
+
+ // using var enumerator = source.GetEnumerator();
+ // while (enumerator.MoveNext())
+ // {
+ // yield return GetBatch(enumerator, batchSize);
+ // }
+ //}
+
+ //private static IEnumerable GetBatch(IEnumerator enumerator, int batchSize)
+ //{
+ // do
+ // {
+ // yield return enumerator.Current;
+ // batchSize--;
+ // } while (batchSize > 0 && enumerator.MoveNext());
+ //}
}
}
diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
index 4ac667d..3df01b5 100644
--- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
+++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
@@ -1,4 +1,5 @@
-using JiShe.CollectBus.Common.Models;
+using FreeSql.DataAnnotations;
+using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -8,7 +9,19 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Ammeters
{
public class AmmeterInfo: DeviceCacheBasicModel
- {
+ {
+ ///
+ /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
+ ///
+ [Column(IsIgnore = true)]
+ public override string MemberID => $"{FocusId}:{MeterId}";
+
+ ///
+ /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
+ ///
+ [Column(IsIgnore = true)]
+ public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks;
+
///
/// 电表名称
///
diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json
index bb09204..ae2025b 100644
--- a/src/JiShe.CollectBus.Host/appsettings.json
+++ b/src/JiShe.CollectBus.Host/appsettings.json
@@ -129,7 +129,7 @@
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
},
- "ServerTagName": "JiSheCollectBus3",
+ "ServerTagName": "JiSheCollectBus2",
"KafkaReplicationFactor": 3,
"NumPartitions": 30
}
\ No newline at end of file