From d002472854b83a546f75c224924536db8caa26fc Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Thu, 17 Apr 2025 15:49:57 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E6=97=A0=E7=94=A8?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=EF=BC=8C=E5=87=8F=E5=B0=91Kafka=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E7=BA=BF=E7=A8=8B=E6=95=B0=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RedisDataCache/IRedisDataCacheService.cs | 13 +- .../RedisDataCache/RedisDataCacheService.cs | 188 +----------------- .../BasicScheduledMeterReadingService.cs | 79 +++++--- .../KafkaSubcribesExtensions.cs | 2 +- 4 files changed, 56 insertions(+), 226 deletions(-) diff --git a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs index 5f0e8f8..eb400c7 100644 --- a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs @@ -132,18 +132,7 @@ namespace JiShe.CollectBus.Application.Contracts bool descending = true) where T : DeviceCacheBasicModel; - - /// - /// 优化后的分页获取方法(支持百万级数据) - /// - Task> GetAllPagedDataOptimized( - string redisHashCacheKey, - string redisZSetScoresIndexCacheKey, - int pageSize = 1000, - decimal? lastScore = null, - string lastMember = null, - bool descending = true) where T : DeviceCacheBasicModel; - + ///// ///// 游标分页查询 ///// diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index 0d9c80d..43dae23 100644 --- a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -365,182 +365,7 @@ namespace JiShe.CollectBus.RedisDataCache { throw new Exception(); } - - - /// - /// 优化后的分页获取方法(支持百万级数据) - /// - public async Task> GetAllPagedDataOptimized( - string redisHashCacheKey, - string redisZSetScoresIndexCacheKey, - int pageSize = 1000, - decimal? lastScore = null, - string lastMember = null, - bool descending = true) where T : DeviceCacheBasicModel - { - // 参数校验 - if (string.IsNullOrWhiteSpace(redisHashCacheKey) || - string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) - { - _logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized)); - return new BusCacheGlobalPagedResult { Items = new List() }; - } - - pageSize = Math.Clamp(pageSize, 1, 10000); - - const string luaScript = @" - local command = ARGV[1] - local range_start = ARGV[2] - local range_end = ARGV[3] - local limit = tonumber(ARGV[4]) - local last_score = ARGV[5] - local last_member = ARGV[6] - - -- 获取扩展数据(5倍分页大小) - local members - if command == 'ZRANGEBYSCORE' then - members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end, - 'WITHSCORES', 'LIMIT', 0, limit * 5) - else - members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, - 'WITHSCORES', 'LIMIT', 0, limit * 5) - end - - -- 精确分页过滤 - local filtered = {} - local count = 0 - local start_index = 1 - - -- 存在锚点时寻找起始位置 - if last_score ~= '' and last_member ~= '' then - for i=1,#members,2 do - local score = members[i+1] - local member = members[i] - - if command == 'ZRANGEBYSCORE' then - if tonumber(score) > tonumber(last_score) then - start_index = i - break - elseif tonumber(score) == tonumber(last_score) then - if member > last_member then - start_index = i - break - end - end - else - if tonumber(score) < tonumber(last_score) then - start_index = i - break - elseif tonumber(score) == tonumber(last_score) then - if member < last_member then - start_index = i - break - end - end - end - end - end - - -- 收集有效数据 - for i=start_index,#members,2 do - if count >= limit then break end - table.insert(filtered, members[i]) - table.insert(filtered, members[i+1]) - count = count + 1 - end - - -- 提取有效数据 - local result_members = {} - local result_scores = {} - for i=1,#filtered,2 do - table.insert(result_members, filtered[i]) - table.insert(result_scores, filtered[i+1]) - end - - if #result_members == 0 then return {0,{},{},{}} end - - -- 获取Hash数据 - local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) - return {#result_members, result_members, result_scores, hash_data}"; - - // 构造查询范围(包含等于) - string rangeStart, rangeEnd; - if (descending) - { - rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf"; - rangeEnd = "-inf"; - } - else - { - rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf"; - rangeEnd = "+inf"; - } - - try - { - var scriptResult = (object[])await Instance.EvalAsync( - luaScript, - new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, - new object[] - { - descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", - rangeStart, - rangeEnd, - pageSize, - lastScore?.ToString() ?? "", - lastMember ?? "" - }); - - var itemCount = (long)scriptResult[0]; - if (itemCount == 0) - return new BusCacheGlobalPagedResult { Items = new List() }; - - // 处理结果集 - var members = ((object[])scriptResult[1]).Cast().ToList(); - var scores = ((object[])scriptResult[2]).Cast() - .Select(decimal.Parse).ToList(); - var hashData = ((object[])scriptResult[3]).Cast().ToList(); - - var validItems = members.AsParallel() - .Select((m, i) => - { - try { return BusJsonSerializer.Deserialize(hashData[i]); } - catch { return null; } - }) - .Where(x => x != null) - .ToList(); - - // 精确分页控制 - var hasNext = validItems.Count >= pageSize; - var actualItems = validItems.Take(pageSize).ToList(); - - // 计算下一页锚点(必须基于原始排序) - decimal? nextScore = null; - string nextMember = null; - if (hasNext && actualItems.Count > 0) - { - var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1); - nextScore = scores[lastValidIndex]; - nextMember = members[lastValidIndex]; - } - - return new BusCacheGlobalPagedResult - { - Items = actualItems, - HasNext = hasNext, - NextScore = nextScore, - NextMember = nextMember, - TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey), - PageSize = pageSize - }; - } - catch (Exception ex) - { - _logger.LogError(ex, "分页查询异常"); - return new BusCacheGlobalPagedResult { Items = new List() }; - } - } - + /// /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 /// @@ -562,17 +387,14 @@ namespace JiShe.CollectBus.RedisDataCache where T : DeviceCacheBasicModel { // 参数校验增强 - if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + if (string.IsNullOrWhiteSpace(redisHashCacheKey) || + string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) { _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101"); - return null; + return new BusCacheGlobalPagedResult { Items = new List() }; } - if (pageSize < 1 || pageSize > 10000) - { - _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102"); - return null; - } + pageSize = Math.Clamp(pageSize, 1, 10000); var luaScript = @" local command = ARGV[1] diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 9184a62..b6eb361 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -47,7 +47,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IRedisDataCacheService redisDataCacheService, - IIoTDBProvider dbProvider, + IIoTDBProvider dbProvider, IOptions kafkaOptions) { _logger = logger; @@ -125,7 +125,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); continue; } - + var meterTypes = EnumExtensions.ToEnumDictionary(); if (meteryType == MeterTypeEnum.Ammeter.ToString()) @@ -155,7 +155,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading member = page.HasNext ? page.NextMember : null; hasNext = page.HasNext; } while (hasNext); - + if (meterInfos == null || meterInfos.Count <= 0) { timer.Stop(); @@ -169,9 +169,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading //await DeviceGroupBalanceControl.ProcessGenericListAsync( // items: meterInfos, // deviceIdSelector: data => data.FocusAddress, - // processor: (data, threadId) => + // processor: (data, groupIndex) => // { - // _ = AmmerterCreatePublishTask(timeDensity, data); + // _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); // } //); @@ -180,9 +180,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.FocusAddress, - processor: (data,groupIndex) => + processor: (data, groupIndex) => { - _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); + _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); } ); @@ -247,9 +247,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading // meterInfos.Add(tempData); // //focusAddressDataLista.Add(item.FocusAddress); //} - - - + + + var timeDensity = "15"; var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; @@ -282,7 +282,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading while (true) { - var page = await _redisDataCacheService.GetAllPagedDataOptimized( + var page = await _redisDataCacheService.GetAllPagedData( redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp, pageSize: 1000, @@ -290,17 +290,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading lastMember: member); meterInfos.AddRange(page.Items); - focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress)); + focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress)); foreach (var item in page.Items) { if (!allIds.Add(item.MemberId)) - throw new Exception("Duplicate data found!"); + { + _logger.LogError($"{item.MemberId}Duplicate data found!"); + } } if (!page.HasNext) break; score = page.NextScore; member = page.NextMember; } - + timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); @@ -331,7 +333,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; - var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; + var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; List ammeterInfos = new List(); //将表计信息根据集中器分组,获得集中器号 @@ -345,7 +347,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading focusAddressDataList.Add(item.Key); - // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; + // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; #if DEBUG //每次缓存时,删除缓存,避免缓存数据有不准确的问题 @@ -409,7 +411,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading await _redisDataCacheService.BatchInsertDataAsync( redisCacheMeterInfoHashKey, redisCacheMeterInfoSetIndexKey, - redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos); + redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos); //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() @@ -623,7 +625,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); } - + stopwatch.Stop(); @@ -710,7 +712,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 时间格式的任务批次名称 /// private async Task AmmerterCreatePublishTask(int timeDensity - , AmmeterInfo ammeterInfo,int groupIndex,string taskBatch) + , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? @@ -724,7 +726,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { - // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); + // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); return; } @@ -750,7 +752,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) { - // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); + // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); return; } if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) @@ -899,14 +901,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); return; } - await _redisDataCacheService.BatchInsertDataAsync( - redisCacheTelemetryPacketInfoHashKey, - redisCacheTelemetryPacketInfoSetIndexKey, - redisCacheTelemetryPacketInfoZSetScoresIndexKey, - taskList); + + using (var pipe = FreeRedisProvider.Instance.StartPipe()) + { + foreach (var item in taskList) + { + // 主数据存储Hash + pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize()); + + // Set索引缓存 + pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId); + + // ZSET索引缓存Key + pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId); + } + pipe.EndPipe(); + } + + //await _redisDataCacheService.BatchInsertDataAsync( + // redisCacheTelemetryPacketInfoHashKey, + // redisCacheTelemetryPacketInfoSetIndexKey, + // redisCacheTelemetryPacketInfoZSetScoresIndexKey, + // taskList); } /// @@ -923,8 +942,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101"); } int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); - - await _producerService.ProduceAsync(topicName, partition, taskRecord); + + await _producerService.ProduceAsync(topicName, partition, taskRecord); } private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) @@ -1273,7 +1292,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - + //await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs index c28c82c..067c1a9 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs @@ -138,7 +138,7 @@ namespace JiShe.CollectBus.Kafka int threadCount = 0; foreach (var sub in subscribedMethods) { - int partitionCount = kafkaOptionConfig.NumPartitions; + int partitionCount = 3;// kafkaOptionConfig.NumPartitions; //var adminClientService = provider.GetRequiredService(); //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; if (partitionCount <= 0) From f1082ce8a299a1b74b8c016941044410292b30fb Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Thu, 17 Apr 2025 16:19:46 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=BA=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- JiShe.CollectBus.sln | 7 ------- .../BasicScheduledMeterReadingService.cs | 4 ++-- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index ea5e278..9c67aae 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -39,8 +39,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -115,10 +113,6 @@ Global {443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU {443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU - {FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU - {FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU - {FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -141,7 +135,6 @@ Global {A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} - {FA762E8F-659A-DECF-83D6-5F364144450E} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 9184a62..4cd968c 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -304,8 +304,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - //return; + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif From 3020a4967245b87be96e3f1504022eada18b8634 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Thu, 17 Apr 2025 17:23:20 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Samples/SampleAppService.cs | 13 + .../BasicScheduledMeterReadingService.cs | 392 +++++------------- .../DeviceGroupBalanceControl.cs | 4 +- 3 files changed, 109 insertions(+), 300 deletions(-) diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 09bd962..a47ae6c 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -226,6 +226,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); + + List focusAddressDataLista = new List(); + foreach (var item in meterInfos) + { + focusAddressDataLista.Add(item.FocusAddress); + } + + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + + // 打印分布统计 + DeviceGroupBalanceControl.PrintDistributionStats(); + + await Task.CompletedTask; } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index b6eb361..1537a29 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -25,6 +25,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; using static FreeSql.Internal.GlobalFilter; @@ -182,7 +183,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading deviceIdSelector: data => data.FocusAddress, processor: (data, groupIndex) => { - _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); + AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); } ); @@ -582,126 +583,104 @@ namespace JiShe.CollectBus.ScheduledMeterReading int timeDensity = 15; var currentDateTime = DateTime.Now; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); - var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); - return; - } + // 自动计算最佳并发度 + int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount(); - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) + var options = new ParallelOptions { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); - return; - } - - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) + MaxDegreeOfParallelism = recommendedThreads, + }; + string taskBatch = "20250417155016"; + Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex => { - foreach (var ammerterItem in focusItem.Value) + Console.WriteLine($"15分钟采集电表数据:{groupIndex}"); + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + + List meterInfos = new List(); + decimal? cursor = null; + string member = null; + bool hasNext; + do { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheTelemetryPacketInfoHashKey, + redisCacheTelemetryPacketInfoZSetScoresIndexKey, + pageSize: 1000, + lastScore: cursor, + lastMember: member); - _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; - //_ = _producerBus.Publish(tempMsg); + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: meterInfos, + deviceIdSelector: data => data.FocusAddress, + processor: (data, groupIndex) => + { + _= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex); + } + ); - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); - } + } while (hasNext); + }); + + + + //var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); + //var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + //if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) + //{ + // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101"); + // return; + //} + + ////获取下发任务缓存数据 + //Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); + //if (meterTaskInfos == null || meterTaskInfos.Count <= 0) + //{ + // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); + // return; + //} + + //List meterTaskInfosList = new List(); + + ////将取出的缓存任务数据发送到Kafka消息队列中 + //foreach (var focusItem in meterTaskInfos) + //{ + // foreach (var ammerterItem in focusItem.Value) + // { + // var tempMsg = new ScheduledMeterReadingIssuedEventMessage() + // { + // MessageHexString = ammerterItem.Value.IssuedMessageHexString, + // MessageId = ammerterItem.Value.IssuedMessageId, + // FocusAddress = ammerterItem.Value.FocusAddress, + // TimeDensity = timeDensity.ToString(), + // }; + // //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + + // _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + + // //_ = _producerBus.Publish(tempMsg); + + // meterTaskInfosList.Add(ammerterItem.Value); + // } + //} + //if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) + //{ + // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); + //} - stopwatch.Stop(); + //stopwatch.Stop(); - _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); - } - - /// - /// 电表采集任务指令创建 - /// - /// 采集频率1分钟、5分钟、15分钟 - /// 集中器数据分组 - /// - private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary> focusGroup) - { - if (timeDensity <= 0) - { - timeDensity = 1; - } - - if (timeDensity > 15) - { - timeDensity = 15; - } - - if (focusGroup == null || focusGroup.Count <= 0) - { - _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101"); - return; - } - try - { - //将采集器编号的hash值取模分组 - const int TotalShards = 1024; - var focusHashGroups = new Dictionary>>(); - - foreach (var (collectorId, ammetersDictionary) in focusGroup) - { - if (string.IsNullOrWhiteSpace(collectorId)) - { - _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败,无效Key -102"); - continue; - } - - // 计算哈希分组ID - int hashGroupId = Math.Abs(collectorId.GetHashCode() % TotalShards); - - // 获取或创建分组(避免重复查找) - if (!focusHashGroups.TryGetValue(hashGroupId, out var group)) - { - group = new Dictionary>(); - focusHashGroups[hashGroupId] = group; - } - - // 将当前集中器数据加入分组 - group[collectorId] = ammetersDictionary; - } - - if (focusHashGroups == null) - { - _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败 -103"); - return; - } - - //根据分组创建线程批处理集中器 - foreach (var group in focusHashGroups) - { - await AmmerterCreatePublishTask2(timeDensity, group.Value); - } - } - catch (Exception) - { - - throw; - } + //_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); } + /// /// 电表创建发布任务 @@ -711,7 +690,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 时间格式的任务批次名称 /// - private async Task AmmerterCreatePublishTask(int timeDensity + private void AmmerterCreatePublishTask(int timeDensity , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; @@ -719,7 +698,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var currentTime = DateTime.Now; var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; @@ -933,15 +912,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// 主题名称 /// 任务记录 + /// 对应分区,也就是集中器号所在的分组序号 /// private async Task KafkaProducerIssuedMessage(string topicName, - MeterReadingRecords taskRecord) + MeterReadingTelemetryPacketInfo taskRecord,int partition) { if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null) { throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101"); } - int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); await _producerService.ProduceAsync(topicName, partition, taskRecord); } @@ -997,192 +976,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); } } - - /// - /// 电表创建发布任务 - /// - /// 采集频率 - /// 集中器号hash分组的集中器集合数据 - /// - private async Task AmmerterCreatePublishTask2(int timeDensity - , Dictionary> focusGroup) - { - var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; - //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? - - var currentTime = DateTime.Now; - var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); - foreach (var focusInfo in focusGroup) - { - //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 - var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}"; - - foreach (var ammeterInfo in focusInfo.Value) - { - var ammeter = ammeterInfo.Value; - - if (string.IsNullOrWhiteSpace(ammeter.ItemCodes)) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,采集项为空,-101"); - continue; - } - - //载波的不处理 - if (ammeter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,载波不处理,-102"); - continue; - } - - if (ammeter.State.Equals(2)) - { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeter.Name} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}状态为禁用,不处理"); - continue; - } - - ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器 - //if (!IsGennerateCmd(ammeter.LastTime, -1)) - //{ - // _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime},已超过1天未在线,不生成指令"); - // continue; - //} - - if (string.IsNullOrWhiteSpace(ammeter.AreaCode)) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空"); - continue; - } - if (string.IsNullOrWhiteSpace(ammeter.Address)) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空"); - continue; - } - if (Convert.ToInt32(ammeter.Address) > 65535) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535"); - continue; - } - if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033) - { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})"); - continue; - } - - List tempCodes = ammeter.ItemCodes.Deserialize>()!; - - //TODO:自动上报数据只主动采集1类数据。 - if (ammeter.AutomaticReport.Equals(1)) - { - var tempSubCodes = new List(); - if (tempCodes.Contains("0C_49")) - { - tempSubCodes.Add("0C_49"); - } - - if (tempSubCodes.Contains("0C_149")) - { - tempSubCodes.Add("0C_149"); - } - - if (ammeter.ItemCodes.Contains("10_97")) - { - tempSubCodes.Add("10_97"); - } - - if (tempSubCodes == null || tempSubCodes.Count <= 0) - { - _logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}自动上报数据主动采集1类数据时数据类型为空"); - continue; - } - else - { - tempCodes = tempSubCodes; - } - } - - Dictionary keyValuePairs = new Dictionary(); - - foreach (var tempItem in tempCodes) - { - //排除已发送日冻结和月冻结采集项配置 - if (DayFreezeCodes.Contains(tempItem)) - { - continue; - } - - if (MonthFreezeCodes.Contains(tempItem)) - { - continue; - } - - var itemCodeArr = tempItem.Split('_'); - var aFNStr = itemCodeArr[0]; - var aFN = (AFN)aFNStr.HexToDec(); - var fn = int.Parse(itemCodeArr[1]); - byte[] dataInfos = null; - if (ammeter.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据) - { - //实时数据 - dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeter.FocusAddress, ammeter.MeteringCode, (ATypeOfDataItems)fn); - } - else - { - string methonCode = $"AFN{aFNStr}_Fn_Send"; - //特殊表暂不处理 - if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode - , out var handler)) - { - dataInfos = handler(new TelemetryPacketRequest() - { - FocusAddress = ammeter.FocusAddress, - Fn = fn, - Pn = ammeter.MeteringCode - }); - } - else - { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}无效编码。"); - continue; - } - } - //TODO:特殊表 - - if (dataInfos == null || dataInfos.Length <= 0) - { - _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}未能正确获取报文。"); - continue; - } - - - - var meterReadingRecords = new MeterReadingRecords() - { - ProjectID = ammeter.ProjectID, - DatabaseBusiID = ammeter.DatabaseBusiID, - PendingCopyReadTime = pendingCopyReadTime, - CreationTime = currentTime, - MeterAddress = ammeter.AmmerterAddress, - MeterId = ammeter.MeterId, - MeterType = MeterTypeEnum.Ammeter, - FocusAddress = ammeter.FocusAddress, - FocusID = ammeter.FocusId, - AFN = aFN, - Fn = fn, - ItemCode = tempItem, - TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode), - ManualOrNot = false, - Pn = ammeter.MeteringCode, - IssuedMessageId = GuidGenerator.Create().ToString(), - IssuedMessageHexString = Convert.ToHexString(dataInfos), - }; - //meterReadingRecords.CreateDataId(GuidGenerator.Create()); - - keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords); - } - await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); - } - } - } + #endregion diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index 06b7d70..aba63e8 100644 --- a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs +++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -230,7 +230,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl /// /// 自动计算最优线程数 /// - private static int CalculateOptimalThreadCount() + public static int CalculateOptimalThreadCount() { int coreCount = Environment.ProcessorCount; return Math.Min( @@ -418,6 +418,8 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl { Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据"); } + + Console.WriteLine($"总共: {stats.Sum(d=>d.Count)} 条数据"); } }