diff --git a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs index 5f0e8f8..eb400c7 100644 --- a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs @@ -132,18 +132,7 @@ namespace JiShe.CollectBus.Application.Contracts bool descending = true) where T : DeviceCacheBasicModel; - - /// - /// 优化后的分页获取方法(支持百万级数据) - /// - Task> GetAllPagedDataOptimized( - string redisHashCacheKey, - string redisZSetScoresIndexCacheKey, - int pageSize = 1000, - decimal? lastScore = null, - string lastMember = null, - bool descending = true) where T : DeviceCacheBasicModel; - + ///// ///// 游标分页查询 ///// diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index 0d9c80d..43dae23 100644 --- a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -365,182 +365,7 @@ namespace JiShe.CollectBus.RedisDataCache { throw new Exception(); } - - - /// - /// 优化后的分页获取方法(支持百万级数据) - /// - public async Task> GetAllPagedDataOptimized( - string redisHashCacheKey, - string redisZSetScoresIndexCacheKey, - int pageSize = 1000, - decimal? lastScore = null, - string lastMember = null, - bool descending = true) where T : DeviceCacheBasicModel - { - // 参数校验 - if (string.IsNullOrWhiteSpace(redisHashCacheKey) || - string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) - { - _logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized)); - return new BusCacheGlobalPagedResult { Items = new List() }; - } - - pageSize = Math.Clamp(pageSize, 1, 10000); - - const string luaScript = @" - local command = ARGV[1] - local range_start = ARGV[2] - local range_end = ARGV[3] - local limit = tonumber(ARGV[4]) - local last_score = ARGV[5] - local last_member = ARGV[6] - - -- 获取扩展数据(5倍分页大小) - local members - if command == 'ZRANGEBYSCORE' then - members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end, - 'WITHSCORES', 'LIMIT', 0, limit * 5) - else - members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, - 'WITHSCORES', 'LIMIT', 0, limit * 5) - end - - -- 精确分页过滤 - local filtered = {} - local count = 0 - local start_index = 1 - - -- 存在锚点时寻找起始位置 - if last_score ~= '' and last_member ~= '' then - for i=1,#members,2 do - local score = members[i+1] - local member = members[i] - - if command == 'ZRANGEBYSCORE' then - if tonumber(score) > tonumber(last_score) then - start_index = i - break - elseif tonumber(score) == tonumber(last_score) then - if member > last_member then - start_index = i - break - end - end - else - if tonumber(score) < tonumber(last_score) then - start_index = i - break - elseif tonumber(score) == tonumber(last_score) then - if member < last_member then - start_index = i - break - end - end - end - end - end - - -- 收集有效数据 - for i=start_index,#members,2 do - if count >= limit then break end - table.insert(filtered, members[i]) - table.insert(filtered, members[i+1]) - count = count + 1 - end - - -- 提取有效数据 - local result_members = {} - local result_scores = {} - for i=1,#filtered,2 do - table.insert(result_members, filtered[i]) - table.insert(result_scores, filtered[i+1]) - end - - if #result_members == 0 then return {0,{},{},{}} end - - -- 获取Hash数据 - local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) - return {#result_members, result_members, result_scores, hash_data}"; - - // 构造查询范围(包含等于) - string rangeStart, rangeEnd; - if (descending) - { - rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf"; - rangeEnd = "-inf"; - } - else - { - rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf"; - rangeEnd = "+inf"; - } - - try - { - var scriptResult = (object[])await Instance.EvalAsync( - luaScript, - new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, - new object[] - { - descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", - rangeStart, - rangeEnd, - pageSize, - lastScore?.ToString() ?? "", - lastMember ?? "" - }); - - var itemCount = (long)scriptResult[0]; - if (itemCount == 0) - return new BusCacheGlobalPagedResult { Items = new List() }; - - // 处理结果集 - var members = ((object[])scriptResult[1]).Cast().ToList(); - var scores = ((object[])scriptResult[2]).Cast() - .Select(decimal.Parse).ToList(); - var hashData = ((object[])scriptResult[3]).Cast().ToList(); - - var validItems = members.AsParallel() - .Select((m, i) => - { - try { return BusJsonSerializer.Deserialize(hashData[i]); } - catch { return null; } - }) - .Where(x => x != null) - .ToList(); - - // 精确分页控制 - var hasNext = validItems.Count >= pageSize; - var actualItems = validItems.Take(pageSize).ToList(); - - // 计算下一页锚点(必须基于原始排序) - decimal? nextScore = null; - string nextMember = null; - if (hasNext && actualItems.Count > 0) - { - var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1); - nextScore = scores[lastValidIndex]; - nextMember = members[lastValidIndex]; - } - - return new BusCacheGlobalPagedResult - { - Items = actualItems, - HasNext = hasNext, - NextScore = nextScore, - NextMember = nextMember, - TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey), - PageSize = pageSize - }; - } - catch (Exception ex) - { - _logger.LogError(ex, "分页查询异常"); - return new BusCacheGlobalPagedResult { Items = new List() }; - } - } - + /// /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 /// @@ -562,17 +387,14 @@ namespace JiShe.CollectBus.RedisDataCache where T : DeviceCacheBasicModel { // 参数校验增强 - if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + if (string.IsNullOrWhiteSpace(redisHashCacheKey) || + string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) { _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101"); - return null; + return new BusCacheGlobalPagedResult { Items = new List() }; } - if (pageSize < 1 || pageSize > 10000) - { - _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102"); - return null; - } + pageSize = Math.Clamp(pageSize, 1, 10000); var luaScript = @" local command = ARGV[1] diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 9184a62..b6eb361 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -47,7 +47,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IRedisDataCacheService redisDataCacheService, - IIoTDBProvider dbProvider, + IIoTDBProvider dbProvider, IOptions kafkaOptions) { _logger = logger; @@ -125,7 +125,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); continue; } - + var meterTypes = EnumExtensions.ToEnumDictionary(); if (meteryType == MeterTypeEnum.Ammeter.ToString()) @@ -155,7 +155,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading member = page.HasNext ? page.NextMember : null; hasNext = page.HasNext; } while (hasNext); - + if (meterInfos == null || meterInfos.Count <= 0) { timer.Stop(); @@ -169,9 +169,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading //await DeviceGroupBalanceControl.ProcessGenericListAsync( // items: meterInfos, // deviceIdSelector: data => data.FocusAddress, - // processor: (data, threadId) => + // processor: (data, groupIndex) => // { - // _ = AmmerterCreatePublishTask(timeDensity, data); + // _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); // } //); @@ -180,9 +180,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.FocusAddress, - processor: (data,groupIndex) => + processor: (data, groupIndex) => { - _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); + _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); } ); @@ -247,9 +247,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading // meterInfos.Add(tempData); // //focusAddressDataLista.Add(item.FocusAddress); //} - - - + + + var timeDensity = "15"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; @@ -282,7 +282,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading while (true) { - var page = await _redisDataCacheService.GetAllPagedDataOptimized( + var page = await _redisDataCacheService.GetAllPagedData( redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp, pageSize: 1000, @@ -290,17 +290,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading lastMember: member); meterInfos.AddRange(page.Items); - focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress)); + focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress)); foreach (var item in page.Items) { if (!allIds.Add(item.MemberId)) - throw new Exception("Duplicate data found!"); + { + _logger.LogError($"{item.MemberId}Duplicate data found!"); + } } if (!page.HasNext) break; score = page.NextScore; member = page.NextMember; } - + timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); @@ -331,7 +333,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; - var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; + var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; List ammeterInfos = new List(); //将表计信息根据集中器分组,获得集中器号 @@ -345,7 +347,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading focusAddressDataList.Add(item.Key); - // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; + // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; #if DEBUG //每次缓存时,删除缓存,避免缓存数据有不准确的问题 @@ -409,7 +411,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading await _redisDataCacheService.BatchInsertDataAsync( redisCacheMeterInfoHashKey, redisCacheMeterInfoSetIndexKey, - redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos); + redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos); //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() @@ -623,7 +625,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); } - + stopwatch.Stop(); @@ -710,7 +712,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 时间格式的任务批次名称 /// private async Task AmmerterCreatePublishTask(int timeDensity - , AmmeterInfo ammeterInfo,int groupIndex,string taskBatch) + , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? @@ -724,7 +726,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { - // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); + // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); return; } @@ -750,7 +752,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) { - // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); + // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); return; } if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) @@ -899,14 +901,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); return; } - await _redisDataCacheService.BatchInsertDataAsync( - redisCacheTelemetryPacketInfoHashKey, - redisCacheTelemetryPacketInfoSetIndexKey, - redisCacheTelemetryPacketInfoZSetScoresIndexKey, - taskList); + + using (var pipe = FreeRedisProvider.Instance.StartPipe()) + { + foreach (var item in taskList) + { + // 主数据存储Hash + pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize()); + + // Set索引缓存 + pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId); + + // ZSET索引缓存Key + pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId); + } + pipe.EndPipe(); + } + + //await _redisDataCacheService.BatchInsertDataAsync( + // redisCacheTelemetryPacketInfoHashKey, + // redisCacheTelemetryPacketInfoSetIndexKey, + // redisCacheTelemetryPacketInfoZSetScoresIndexKey, + // taskList); } /// @@ -923,8 +942,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101"); } int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); - - await _producerService.ProduceAsync(topicName, partition, taskRecord); + + await _producerService.ProduceAsync(topicName, partition, taskRecord); } private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) @@ -1273,7 +1292,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - + //await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index c28c82c..067c1a9 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -138,7 +138,7 @@ namespace JiShe.CollectBus.Kafka int threadCount = 0; foreach (var sub in subscribedMethods) { - int partitionCount = kafkaOptionConfig.NumPartitions; + int partitionCount = 3;// kafkaOptionConfig.NumPartitions; //var adminClientService = provider.GetRequiredService(); //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0)