From 871ed615a44b314872e90c5ed2a26e895fedfe02 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Thu, 17 Apr 2025 14:12:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusApplicationModule.cs | 4 +- .../RedisDataCache/RedisDataCacheService.cs | 174 ++++++------------ .../KafkaOptionConfig.cs | 2 +- 3 files changed, 61 insertions(+), 119 deletions(-) diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index f3ed978..1826fa4 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -22,6 +22,7 @@ using Volo.Abp.BackgroundWorkers; using Volo.Abp.BackgroundWorkers.Hangfire; using Volo.Abp.EventBus; using Volo.Abp.Modularity; +using Microsoft.Extensions.Options; namespace JiShe.CollectBus; @@ -69,13 +70,14 @@ public class CollectBusApplicationModule : AbpModule //初始化主题信息 var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); var configuration = context.ServiceProvider.GetRequiredService(); + var kafkaOptions = context.ServiceProvider.GetRequiredService>(); List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); foreach (var item in topics) { - await kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue(CommonConst.NumPartitions), configuration.GetValue(CommonConst.KafkaReplicationFactor)); + await kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor); } } diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index 3c96410..0d9c80d 100644 --- a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -389,79 +389,79 @@ namespace JiShe.CollectBus.RedisDataCache pageSize = Math.Clamp(pageSize, 1, 10000); const string luaScript = @" - local command = ARGV[1] - local range_start = ARGV[2] - local range_end = ARGV[3] - local limit = tonumber(ARGV[4]) - local last_score = ARGV[5] - local last_member = ARGV[6] + local command = ARGV[1] + local range_start = ARGV[2] + local range_end = ARGV[3] + local limit = tonumber(ARGV[4]) + local last_score = ARGV[5] + local last_member = ARGV[6] - -- 获取扩展数据(3倍分页大小) - local members - if command == 'ZRANGEBYSCORE' then - members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end, - 'WITHSCORES', 'LIMIT', 0, limit * 5) - else - members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, - 'WITHSCORES', 'LIMIT', 0, limit * 5) - end + -- 获取扩展数据(5倍分页大小) + local members + if command == 'ZRANGEBYSCORE' then + members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end, + 'WITHSCORES', 'LIMIT', 0, limit * 5) + else + members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, + 'WITHSCORES', 'LIMIT', 0, limit * 5) + end - -- 精确分页过滤 - local filtered = {} - local count = 0 - local start_index = 1 + -- 精确分页过滤 + local filtered = {} + local count = 0 + local start_index = 1 - -- 存在锚点时寻找起始位置 - if last_score ~= '' and last_member ~= '' then - for i=1,#members,2 do - local score = members[i+1] - local member = members[i] + -- 存在锚点时寻找起始位置 + if last_score ~= '' and last_member ~= '' then + for i=1,#members,2 do + local score = members[i+1] + local member = members[i] - if command == 'ZRANGEBYSCORE' then - if tonumber(score) > tonumber(last_score) then - start_index = i - break - elseif tonumber(score) == tonumber(last_score) then - if member > last_member then + if command == 'ZRANGEBYSCORE' then + if tonumber(score) > tonumber(last_score) then start_index = i break + elseif tonumber(score) == tonumber(last_score) then + if member > last_member then + start_index = i + break + end end - end - else - if tonumber(score) < tonumber(last_score) then - start_index = i - break - elseif tonumber(score) == tonumber(last_score) then - if member < last_member then + else + if tonumber(score) < tonumber(last_score) then start_index = i break + elseif tonumber(score) == tonumber(last_score) then + if member < last_member then + start_index = i + break + end end end end end - end - -- 收集有效数据 - for i=start_index,#members,2 do - if count >= limit then break end - table.insert(filtered, members[i]) - table.insert(filtered, members[i+1]) - count = count + 1 - end + -- 收集有效数据 + for i=start_index,#members,2 do + if count >= limit then break end + table.insert(filtered, members[i]) + table.insert(filtered, members[i+1]) + count = count + 1 + end - -- 提取有效数据 - local result_members = {} - local result_scores = {} - for i=1,#filtered,2 do - table.insert(result_members, filtered[i]) - table.insert(result_scores, filtered[i+1]) - end + -- 提取有效数据 + local result_members = {} + local result_scores = {} + for i=1,#filtered,2 do + table.insert(result_members, filtered[i]) + table.insert(result_scores, filtered[i+1]) + end - if #result_members == 0 then return {0,{},{},{}} end + if #result_members == 0 then return {0,{},{},{}} end - -- 获取Hash数据 - local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) - return {#result_members, result_members, result_scores, hash_data}"; + -- 获取Hash数据 + local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) + return {#result_members, result_members, result_scores, hash_data}"; // 构造查询范围(包含等于) string rangeStart, rangeEnd; @@ -540,67 +540,7 @@ namespace JiShe.CollectBus.RedisDataCache return new BusCacheGlobalPagedResult { Items = new List() }; } } - - /// - /// 并行分页导出方法(百万级数据支持) - /// - public async Task> FullExportParallel( - string hashKey, - string zsetKey, - int parallelDegree = 10, - int pageSize = 5000) where T : DeviceCacheBasicModel - { - var result = new ConcurrentBag(); - var totalCount = await GetTotalCount(zsetKey); - var totalPages = (int)Math.Ceiling(totalCount / (double)pageSize); - - var semaphore = new SemaphoreSlim(parallelDegree); - var tasks = new List(); - - decimal? lastScore = null; - string lastMember = null; - var isDescending = true; - - for (int page = 0; page < totalPages; page++) - { - await semaphore.WaitAsync(); - - tasks.Add(Task.Run(async () => - { - try - { - var pageResult = await GetAllPagedData( - hashKey, - zsetKey, - pageSize, - lastScore, - lastMember, - isDescending); - - foreach (var item in pageResult.Items) - { - result.Add(item); - } - - // 更新分页锚点 - if (pageResult.HasNext) - { - lastScore = pageResult.NextScore; - lastMember = pageResult.NextMember; - } - } - finally - { - semaphore.Release(); - } - })); - } - - await Task.WhenAll(tasks); - return result.ToList(); - } - - + /// /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 /// diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs index 28b80a5..e592ea2 100644 --- a/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs +++ b/src/JiShe.CollectBus.KafkaProducer/KafkaOptionConfig.cs @@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka /// /// kafka主题副本数量 /// - public int KafkaReplicationFactor { get; set; } + public short KafkaReplicationFactor { get; set; } /// /// kafka主题分区数量