diff --git a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs index c6f3613..5f0e8f8 100644 --- a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs @@ -113,7 +113,7 @@ namespace JiShe.CollectBus.Application.Contracts /// - /// 通过ZSET索引获取数据 + /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 /// /// /// 主数据存储Hash缓存Key @@ -133,6 +133,17 @@ namespace JiShe.CollectBus.Application.Contracts where T : DeviceCacheBasicModel; + /// + /// 优化后的分页获取方法(支持百万级数据) + /// + Task> GetAllPagedDataOptimized( + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + int pageSize = 1000, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) where T : DeviceCacheBasicModel; + ///// ///// 游标分页查询 ///// @@ -149,28 +160,28 @@ namespace JiShe.CollectBus.Application.Contracts // string excludeMember, // bool descending); - ///// - ///// 批量获取指定分页的数据 - ///// - ///// - ///// Hash表缓存key - ///// Hash表字段集合 - ///// - //Task> BatchGetData( - // string redisHashCacheKey, - // IEnumerable members) - // where T : DeviceCacheBasicModel; + ///// + ///// 批量获取指定分页的数据 + ///// + ///// + ///// Hash表缓存key + ///// Hash表字段集合 + ///// + //Task> BatchGetData( + // string redisHashCacheKey, + // IEnumerable members) + // where T : DeviceCacheBasicModel; - ///// - ///// 获取下一页游标 - ///// - ///// 排序索引ZSET缓存Key - ///// 最后一个唯一标识 - ///// 排序方式 - ///// - //Task GetNextScore( - // string redisZSetScoresIndexCacheKey, - // string lastMember, - // bool descending); + ///// + ///// 获取下一页游标 + ///// + ///// 排序索引ZSET缓存Key + ///// 最后一个唯一标识 + ///// 排序方式 + ///// + //Task GetNextScore( + // string redisZSetScoresIndexCacheKey, + // string lastMember, + // bool descending); } } diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index a634789..3c96410 100644 --- a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -70,13 +70,13 @@ namespace JiShe.CollectBus.RedisDataCache using (var trans = Instance.Multi()) { // 主数据存储Hash - trans.HSet(redisHashCacheKey, data.MemberID, data.Serialize()); + trans.HSet(redisHashCacheKey, data.MemberId, data.Serialize()); // 集中器号分组索引Set缓存 - trans.SAdd(redisSetIndexCacheKey, data.MemberID); + trans.SAdd(redisSetIndexCacheKey, data.MemberId); // 集中器与表计信息排序索引ZSET缓存Key - trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberID); + trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberId); var results = trans.Exec(); @@ -128,13 +128,13 @@ namespace JiShe.CollectBus.RedisDataCache foreach (var item in batch) { // 主数据存储Hash - pipe.HSet(redisHashCacheKey, item.MemberID, item.Serialize()); + pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize()); // Set索引缓存 - pipe.SAdd(redisSetIndexCacheKey, item.MemberID); + pipe.SAdd(redisSetIndexCacheKey, item.MemberId); // ZSET索引缓存Key - pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberID); + pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId); } pipe.EndPipe(); } @@ -192,11 +192,11 @@ namespace JiShe.CollectBus.RedisDataCache redisZSetScoresIndexCacheKey }; - var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID }); + var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberId }); if ((int)result == 0) { - _logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberID}数据失败,-102"); + _logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberId}数据失败,-102"); } } @@ -248,13 +248,13 @@ namespace JiShe.CollectBus.RedisDataCache }, new object[] { - newData.MemberID, + newData.MemberId, newData.Serialize() }); if ((int)result == 0) { - _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102"); + _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102"); } } @@ -328,7 +328,7 @@ namespace JiShe.CollectBus.RedisDataCache }, new object[] { - newData.MemberID, + newData.MemberId, oldMemberId, newData.Serialize(), newData.ScoreValue.ToString() ?? "", @@ -336,7 +336,7 @@ namespace JiShe.CollectBus.RedisDataCache if ((int)result == 0) { - _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102"); + _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102"); } } @@ -364,10 +364,245 @@ namespace JiShe.CollectBus.RedisDataCache where T : DeviceCacheBasicModel { throw new Exception(); - } + } + /// - /// 通过ZSET索引获取数据 + /// 优化后的分页获取方法(支持百万级数据) + /// + public async Task> GetAllPagedDataOptimized( + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + int pageSize = 1000, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) where T : DeviceCacheBasicModel + { + // 参数校验 + if (string.IsNullOrWhiteSpace(redisHashCacheKey) || + string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + { + _logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized)); + return new BusCacheGlobalPagedResult { Items = new List() }; + } + + pageSize = Math.Clamp(pageSize, 1, 10000); + + const string luaScript = @" + local command = ARGV[1] + local range_start = ARGV[2] + local range_end = ARGV[3] + local limit = tonumber(ARGV[4]) + local last_score = ARGV[5] + local last_member = ARGV[6] + + -- 获取扩展数据(3倍分页大小) + local members + if command == 'ZRANGEBYSCORE' then + members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end, + 'WITHSCORES', 'LIMIT', 0, limit * 5) + else + members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, + 'WITHSCORES', 'LIMIT', 0, limit * 5) + end + + -- 精确分页过滤 + local filtered = {} + local count = 0 + local start_index = 1 + + -- 存在锚点时寻找起始位置 + if last_score ~= '' and last_member ~= '' then + for i=1,#members,2 do + local score = members[i+1] + local member = members[i] + + if command == 'ZRANGEBYSCORE' then + if tonumber(score) > tonumber(last_score) then + start_index = i + break + elseif tonumber(score) == tonumber(last_score) then + if member > last_member then + start_index = i + break + end + end + else + if tonumber(score) < tonumber(last_score) then + start_index = i + break + elseif tonumber(score) == tonumber(last_score) then + if member < last_member then + start_index = i + break + end + end + end + end + end + + -- 收集有效数据 + for i=start_index,#members,2 do + if count >= limit then break end + table.insert(filtered, members[i]) + table.insert(filtered, members[i+1]) + count = count + 1 + end + + -- 提取有效数据 + local result_members = {} + local result_scores = {} + for i=1,#filtered,2 do + table.insert(result_members, filtered[i]) + table.insert(result_scores, filtered[i+1]) + end + + if #result_members == 0 then return {0,{},{},{}} end + + -- 获取Hash数据 + local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) + return {#result_members, result_members, result_scores, hash_data}"; + + // 构造查询范围(包含等于) + string rangeStart, rangeEnd; + if (descending) + { + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf"; + rangeEnd = "-inf"; + } + else + { + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf"; + rangeEnd = "+inf"; + } + + try + { + var scriptResult = (object[])await Instance.EvalAsync( + luaScript, + new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, + new object[] + { + descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", + rangeStart, + rangeEnd, + pageSize, + lastScore?.ToString() ?? "", + lastMember ?? "" + }); + + var itemCount = (long)scriptResult[0]; + if (itemCount == 0) + return new BusCacheGlobalPagedResult { Items = new List() }; + + // 处理结果集 + var members = ((object[])scriptResult[1]).Cast().ToList(); + var scores = ((object[])scriptResult[2]).Cast() + .Select(decimal.Parse).ToList(); + var hashData = ((object[])scriptResult[3]).Cast().ToList(); + + var validItems = members.AsParallel() + .Select((m, i) => + { + try { return BusJsonSerializer.Deserialize(hashData[i]); } + catch { return null; } + }) + .Where(x => x != null) + .ToList(); + + // 精确分页控制 + var hasNext = validItems.Count >= pageSize; + var actualItems = validItems.Take(pageSize).ToList(); + + // 计算下一页锚点(必须基于原始排序) + decimal? nextScore = null; + string nextMember = null; + if (hasNext && actualItems.Count > 0) + { + var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1); + nextScore = scores[lastValidIndex]; + nextMember = members[lastValidIndex]; + } + + return new BusCacheGlobalPagedResult + { + Items = actualItems, + HasNext = hasNext, + NextScore = nextScore, + NextMember = nextMember, + TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey), + PageSize = pageSize + }; + } + catch (Exception ex) + { + _logger.LogError(ex, "分页查询异常"); + return new BusCacheGlobalPagedResult { Items = new List() }; + } + } + + /// + /// 并行分页导出方法(百万级数据支持) + /// + public async Task> FullExportParallel( + string hashKey, + string zsetKey, + int parallelDegree = 10, + int pageSize = 5000) where T : DeviceCacheBasicModel + { + var result = new ConcurrentBag(); + var totalCount = await GetTotalCount(zsetKey); + var totalPages = (int)Math.Ceiling(totalCount / (double)pageSize); + + var semaphore = new SemaphoreSlim(parallelDegree); + var tasks = new List(); + + decimal? lastScore = null; + string lastMember = null; + var isDescending = true; + + for (int page = 0; page < totalPages; page++) + { + await semaphore.WaitAsync(); + + tasks.Add(Task.Run(async () => + { + try + { + var pageResult = await GetAllPagedData( + hashKey, + zsetKey, + pageSize, + lastScore, + lastMember, + isDescending); + + foreach (var item in pageResult.Items) + { + result.Add(item); + } + + // 更新分页锚点 + if (pageResult.HasNext) + { + lastScore = pageResult.NextScore; + lastMember = pageResult.NextMember; + } + } + finally + { + semaphore.Release(); + } + })); + } + + await Task.WhenAll(tasks); + return result.ToList(); + } + + + /// + /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 /// /// /// 主数据存储Hash缓存Key diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 3af4b6a..3658557 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -191,40 +191,41 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS } /// - /// 测试单个测点数据项 + /// 测试Redis批量读取10万条数据性能 /// /// [HttpGet] public async Task TestRedisCacheGetAllPagedData() { var timeDensity = "15"; - string SystemType = ""; + string SystemType = "Energy"; string ServerTagName = "JiSheCollectBus2"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var timer = Stopwatch.StartNew(); + var timer1 = Stopwatch.StartNew(); decimal? cursor = null; string member = null; bool hasNext; List meterInfos = new List(); do { - var page = await _redisDataCacheService - .GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp); + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); meterInfos.AddRange(page.Items); - cursor = page.NextScore; - member = page.NextMember; + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; hasNext = page.HasNext; } while (hasNext); - timer.Stop(); - - _logger.LogInformation($"{nameof(TestRedisCacheGetAllPagedData)} 获取电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒"); + timer1.Stop(); + _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index d32cdb2..27de191 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -15,6 +15,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; +using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.Repository.MeterReadingRecord; using Mapster; using Microsoft.Extensions.Logging; @@ -23,6 +24,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -121,32 +123,45 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); continue; } - - - - //获取缓存中的表信息 - var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meteryType, timeDensity)}*"; - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104"); - return; - } - + var meterTypes = EnumExtensions.ToEnumDictionary(); if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - // 解析结果(结果为嵌套数组) - var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); + var timer = Stopwatch.StartNew(); + + //获取对应频率中的所有电表信息 + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + + List meterInfos = new List(); + decimal? cursor = null; + string member = null; + bool hasNext; + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + } while (hasNext); + if (meterInfos == null || meterInfos.Count <= 0) { + timer.Stop(); _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); return; } //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); - var timer = Stopwatch.StartNew(); //处理数据 //await DeviceGroupBalanceControl.ProcessGenericListAsync( @@ -163,14 +178,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.FocusAddress, - processor: data => + processor: (data,groupIndex) => { - _ = AmmerterCreatePublishTask(timeDensity, data); + _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); } ); timer.Stop(); - _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); + _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) @@ -230,37 +245,65 @@ namespace JiShe.CollectBus.ScheduledMeterReading // meterInfos.Add(tempData); // //focusAddressDataLista.Add(item.FocusAddress); //} - - //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); - + + + var timeDensity = "15"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var timer1 = Stopwatch.StartNew(); - decimal? cursor = null; - string member = null; - bool hasNext; List meterInfos = new List(); - do + List focusAddressDataLista = new List(); + var timer1 = Stopwatch.StartNew(); + //decimal? cursor = null; + //string member = null; + //bool hasNext; + //do + //{ + // var page = await _redisDataCacheService.GetAllPagedDataOptimized( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); + + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + //} while (hasNext); + + var allIds = new HashSet(); + decimal? score = null; + string member = null; + + while (true) { - var page = await _redisDataCacheService.GetAllPagedData( + var page = await _redisDataCacheService.GetAllPagedDataOptimized( redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp, pageSize: 1000, - lastScore: cursor, + lastScore: score, lastMember: member); meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress)); + foreach (var item in page.Items) + { + if (!allIds.Add(item.MemberId)) + throw new Exception("Duplicate data found!"); + } + if (!page.HasNext) break; + score = page.NextScore; + member = page.NextMember; + } + timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - + //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + //return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif @@ -656,9 +699,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// 采集频率 /// 集中器号hash分组的集中器集合数据 + /// 集中器所在分组 + /// 时间格式的任务批次名称 /// private async Task AmmerterCreatePublishTask(int timeDensity - , AmmeterInfo ammeterInfo) + , AmmeterInfo ammeterInfo,int groupIndex,string taskBatch) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? @@ -666,7 +711,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading var currentTime = DateTime.Now; var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 - var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}"; + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { @@ -747,7 +794,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } - Dictionary keyValuePairs = new Dictionary(); + //Dictionary keyValuePairs = new Dictionary(); + List taskList = new List(); foreach (var tempItem in tempCodes) { @@ -802,7 +850,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading - var meterReadingRecords = new MeterReadingRecords() + var meterReadingRecords = new MeterReadingTelemetryPacketInfo() { ProjectID = ammeterInfo.ProjectID, DatabaseBusiID = ammeterInfo.DatabaseBusiID, @@ -812,7 +860,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading MeterId = ammeterInfo.MeterId, MeterType = MeterTypeEnum.Ammeter, FocusAddress = ammeterInfo.FocusAddress, - FocusID = ammeterInfo.FocusId, + FocusId = ammeterInfo.FocusId, AFN = aFN, Fn = fn, ItemCode = tempItem, @@ -822,9 +870,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageHexString = Convert.ToHexString(dataInfos), }; - meterReadingRecords.CreateDataId(GuidGenerator.Create()); - keyValuePairs.TryAdd($"{ammeterInfo.MeterId}_{tempItem}", meterReadingRecords); + //meterReadingRecords.CreateDataId(GuidGenerator.Create()); + + taskList.Add(meterReadingRecords); } //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); //await Task.Delay(timeSpan); @@ -832,14 +881,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading //return keyValuePairs; // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); - using (var pipe = FreeRedisProvider.Instance.StartPipe()) + //using (var pipe = FreeRedisProvider.Instance.StartPipe()) + //{ + // pipe.HSet(redisCacheKey, keyValuePairs); + // object[] ret = pipe.EndPipe(); + //} + if (taskList == null + || taskList.Count() <= 0 + || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) + || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) + || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) { - pipe.HSet(redisCacheKey, keyValuePairs); - object[] ret = pipe.EndPipe(); + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); + return; } - - - await Task.CompletedTask; + await _redisDataCacheService.BatchInsertDataAsync( + redisCacheTelemetryPacketInfoHashKey, + redisCacheTelemetryPacketInfoSetIndexKey, + redisCacheTelemetryPacketInfoZSetScoresIndexKey, + taskList); } /// @@ -1088,7 +1148,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageHexString = Convert.ToHexString(dataInfos), }; - meterReadingRecords.CreateDataId(GuidGenerator.Create()); + //meterReadingRecords.CreateDataId(GuidGenerator.Create()); keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords); } diff --git a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs index dce5307..7ac170b 100644 --- a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs +++ b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs @@ -49,23 +49,23 @@ namespace JiShe.CollectBus.Common.Consts /// /// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// - public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}{"{3}"}"; + public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}:{"{3}"}"; public const string TelemetryPacket = "TelemetryPacket"; /// - /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 + /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次 /// - public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}"; + public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}:{"{4}"}:{"{5}"}"; /// - /// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 + /// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次 /// - public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}"; + public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}:{"{4}"}:{"{5}"}"; /// - /// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 + /// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次 /// - public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}"; + public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}:{"{4}"}:{"{5}"}"; ///// ///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记 diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index 4be0f4f..06b7d70 100644 --- a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs +++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -161,7 +161,6 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl MaxDegreeOfParallelism = maxThreads.Value, }; - TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); await Task.Run(() => { Parallel.For(0, cache.CachedGroups.Length, options, async groupId => @@ -169,8 +168,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl var queue = groupQueues[groupId]; while (queue.TryDequeue(out T item)) { - await Task.Delay(timeSpan); - processor(item, Thread.CurrentThread.ManagedThreadId); + processor(item, groupId); } }); }); @@ -183,14 +181,14 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl /// 已经分组的设备信息 /// 部分或者全部的已经分组的设备集合 /// 从泛型对象提取deviceId - /// 处理委托(参数:当前对象,线程ID) + /// 处理委托(参数:当前对象,分组ID) /// 可选最佳并发度 /// /// public static async Task ProcessWithThrottleAsync( List items, Func deviceIdSelector, - Action processor, + Action processor, int? maxConcurrency = null) { var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); @@ -244,7 +242,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl /// /// 分组异步处理(带节流) /// - private static async Task ProcessItemAsync(T item, Action processor, int groupId) + private static async Task ProcessItemAsync(T item, Action processor, int groupId) { // 使用内存缓存降低CPU负载 await Task.Yield(); // 立即释放当前线程 @@ -255,7 +253,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl { ExecutionContext.Run(context!, state => { - processor(item); + processor(item,groupId); }, null); }); } diff --git a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs index 335c17c..d397151 100644 --- a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs +++ b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs @@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Common.Models /// /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 /// - public virtual string MemberID => $"{FocusId}:{MeterId}"; + public virtual string MemberId => $"{FocusId}:{MeterId}"; /// /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs index 3df01b5..8b082bb 100644 --- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs @@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Ammeters /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 /// [Column(IsIgnore = true)] - public override string MemberID => $"{FocusId}:{MeterId}"; + public override string MemberId => $"{FocusId}:{MeterId}"; /// /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs new file mode 100644 index 0000000..c3f75d3 --- /dev/null +++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -0,0 +1,141 @@ +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Models; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Domain.Entities; +using Volo.Abp.Domain.Entities.Auditing; + +namespace JiShe.CollectBus.IotSystems.MeterReadingRecords +{ + /// + /// 抄读任务Redis缓存数据记录 + /// + public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel + { + /// + /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 + /// + public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}"; + + /// + /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 + /// + public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks; + + + /// + /// 是否手动操作 + /// + public bool ManualOrNot { get; set; } + + /// + /// 任务数据唯一标记 + /// + public string TaskMark { get; set; } + + /// + /// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。 + /// + public long Timestamps { get; set; } + + /// + /// 是否超时 + /// + public bool IsTimeout { get; set; } = false; + + /// + /// 待抄读时间 + /// + public DateTime PendingCopyReadTime { get; set; } + + + /// + /// 集中器地址 + /// + public string FocusAddress { get; set; } + + /// + /// 表地址 + /// + public string MeterAddress { get; set; } + + /// + /// 表类型 + /// + public MeterTypeEnum MeterType { get; set; } + + /// + /// 项目ID + /// + public int ProjectID { get; set; } + + /// + /// 数据库业务ID + /// + public int DatabaseBusiID { get; set; } + + /// + /// AFN功能码 + /// + public AFN AFN { get; set; } + + /// + /// 抄读功能码 + /// + public int Fn { get; set; } + + /// + /// 抄读计量点 + /// + public int Pn { get; set; } + + /// + /// 采集项编码 + /// + public string ItemCode { get; set;} + + + /// + /// 创建时间 + /// + public DateTime CreationTime { get; set; } + + /// + /// 下发消息内容 + /// + public string IssuedMessageHexString { get; set; } + + /// + /// 下发消息Id + /// + public string IssuedMessageId { get; set; } + + /// + /// 消息上报内容 + /// + public string? ReceivedMessageHexString { get; set; } + + /// + /// 消息上报时间 + /// + public DateTime? ReceivedTime { get; set; } + + /// + /// 上报消息Id + /// + public string ReceivedMessageId { get; set; } + + /// + /// 上报报文解析备注,异常情况下才有 + /// + public string ReceivedRemark { get; set; } + + //public void CreateDataId(Guid Id) + //{ + // this.Id = Id; + //} + } +}