提交代码

This commit is contained in:
ChenYi 2025-04-17 14:12:02 +08:00
parent 0584515df9
commit 871ed615a4
3 changed files with 61 additions and 119 deletions

View File

@ -22,6 +22,7 @@ using Volo.Abp.BackgroundWorkers;
using Volo.Abp.BackgroundWorkers.Hangfire;
using Volo.Abp.EventBus;
using Volo.Abp.Modularity;
using Microsoft.Extensions.Options;
namespace JiShe.CollectBus;
@ -69,13 +70,14 @@ public class CollectBusApplicationModule : AbpModule
//初始化主题信息
var kafkaAdminClient = context.ServiceProvider.GetRequiredService<IAdminClientService>();
var configuration = context.ServiceProvider.GetRequiredService<IConfiguration>();
var kafkaOptions = context.ServiceProvider.GetRequiredService<IOptions<KafkaOptionConfig>>();
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
foreach (var item in topics)
{
await kafkaAdminClient.CreateTopicAsync(item, configuration.GetValue<int>(CommonConst.NumPartitions), configuration.GetValue<short>(CommonConst.KafkaReplicationFactor));
await kafkaAdminClient.CreateTopicAsync(item, kafkaOptions.Value.NumPartitions, kafkaOptions.Value.KafkaReplicationFactor);
}
}

View File

@ -389,79 +389,79 @@ namespace JiShe.CollectBus.RedisDataCache
pageSize = Math.Clamp(pageSize, 1, 10000);
const string luaScript = @"
local command = ARGV[1]
local range_start = ARGV[2]
local range_end = ARGV[3]
local limit = tonumber(ARGV[4])
local last_score = ARGV[5]
local last_member = ARGV[6]
local command = ARGV[1]
local range_start = ARGV[2]
local range_end = ARGV[3]
local limit = tonumber(ARGV[4])
local last_score = ARGV[5]
local last_member = ARGV[6]
-- 3
local members
if command == 'ZRANGEBYSCORE' then
members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
else
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
end
-- 5
local members
if command == 'ZRANGEBYSCORE' then
members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
else
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
end
--
local filtered = {}
local count = 0
local start_index = 1
--
local filtered = {}
local count = 0
local start_index = 1
--
if last_score ~= '' and last_member ~= '' then
for i=1,#members,2 do
local score = members[i+1]
local member = members[i]
--
if last_score ~= '' and last_member ~= '' then
for i=1,#members,2 do
local score = members[i+1]
local member = members[i]
if command == 'ZRANGEBYSCORE' then
if tonumber(score) > tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member > last_member then
if command == 'ZRANGEBYSCORE' then
if tonumber(score) > tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member > last_member then
start_index = i
break
end
end
end
else
if tonumber(score) < tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member < last_member then
else
if tonumber(score) < tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member < last_member then
start_index = i
break
end
end
end
end
end
end
--
for i=start_index,#members,2 do
if count >= limit then break end
table.insert(filtered, members[i])
table.insert(filtered, members[i+1])
count = count + 1
end
--
for i=start_index,#members,2 do
if count >= limit then break end
table.insert(filtered, members[i])
table.insert(filtered, members[i+1])
count = count + 1
end
--
local result_members = {}
local result_scores = {}
for i=1,#filtered,2 do
table.insert(result_members, filtered[i])
table.insert(result_scores, filtered[i+1])
end
--
local result_members = {}
local result_scores = {}
for i=1,#filtered,2 do
table.insert(result_members, filtered[i])
table.insert(result_scores, filtered[i+1])
end
if #result_members == 0 then return {0,{},{},{}} end
if #result_members == 0 then return {0,{},{},{}} end
-- Hash数据
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {#result_members, result_members, result_scores, hash_data}";
-- Hash数据
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {#result_members, result_members, result_scores, hash_data}";
// 构造查询范围(包含等于)
string rangeStart, rangeEnd;
@ -541,66 +541,6 @@ namespace JiShe.CollectBus.RedisDataCache
}
}
/// <summary>
/// 并行分页导出方法(百万级数据支持)
/// </summary>
public async Task<List<T>> FullExportParallel<T>(
string hashKey,
string zsetKey,
int parallelDegree = 10,
int pageSize = 5000) where T : DeviceCacheBasicModel
{
var result = new ConcurrentBag<T>();
var totalCount = await GetTotalCount(zsetKey);
var totalPages = (int)Math.Ceiling(totalCount / (double)pageSize);
var semaphore = new SemaphoreSlim(parallelDegree);
var tasks = new List<Task>();
decimal? lastScore = null;
string lastMember = null;
var isDescending = true;
for (int page = 0; page < totalPages; page++)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
var pageResult = await GetAllPagedData<T>(
hashKey,
zsetKey,
pageSize,
lastScore,
lastMember,
isDescending);
foreach (var item in pageResult.Items)
{
result.Add(item);
}
// 更新分页锚点
if (pageResult.HasNext)
{
lastScore = pageResult.NextScore;
lastMember = pageResult.NextMember;
}
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
return result.ToList();
}
/// <summary>
/// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
/// </summary>

View File

@ -22,7 +22,7 @@ namespace JiShe.CollectBus.Kafka
/// <summary>
/// kafka主题副本数量
/// </summary>
public int KafkaReplicationFactor { get; set; }
public short KafkaReplicationFactor { get; set; }
/// <summary>
/// kafka主题分区数量