移除无用代码,减少Kafka订阅线程数量

This commit is contained in:
ChenYi 2025-04-17 15:49:57 +08:00
parent 0ba64a4b90
commit d002472854
4 changed files with 56 additions and 226 deletions

View File

@ -133,17 +133,6 @@ namespace JiShe.CollectBus.Application.Contracts
where T : DeviceCacheBasicModel; where T : DeviceCacheBasicModel;
/// <summary>
/// 优化后的分页获取方法(支持百万级数据)
/// </summary>
Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true) where T : DeviceCacheBasicModel;
///// <summary> ///// <summary>
///// 游标分页查询 ///// 游标分页查询
///// </summary> ///// </summary>

View File

@ -366,181 +366,6 @@ namespace JiShe.CollectBus.RedisDataCache
throw new Exception(); throw new Exception();
} }
/// <summary>
/// 优化后的分页获取方法(支持百万级数据)
/// </summary>
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true) where T : DeviceCacheBasicModel
{
// 参数校验
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized));
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
}
pageSize = Math.Clamp(pageSize, 1, 10000);
const string luaScript = @"
local command = ARGV[1]
local range_start = ARGV[2]
local range_end = ARGV[3]
local limit = tonumber(ARGV[4])
local last_score = ARGV[5]
local last_member = ARGV[6]
-- 5
local members
if command == 'ZRANGEBYSCORE' then
members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
else
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
end
--
local filtered = {}
local count = 0
local start_index = 1
--
if last_score ~= '' and last_member ~= '' then
for i=1,#members,2 do
local score = members[i+1]
local member = members[i]
if command == 'ZRANGEBYSCORE' then
if tonumber(score) > tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member > last_member then
start_index = i
break
end
end
else
if tonumber(score) < tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member < last_member then
start_index = i
break
end
end
end
end
end
--
for i=start_index,#members,2 do
if count >= limit then break end
table.insert(filtered, members[i])
table.insert(filtered, members[i+1])
count = count + 1
end
--
local result_members = {}
local result_scores = {}
for i=1,#filtered,2 do
table.insert(result_members, filtered[i])
table.insert(result_scores, filtered[i+1])
end
if #result_members == 0 then return {0,{},{},{}} end
-- Hash数据
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {#result_members, result_members, result_scores, hash_data}";
// 构造查询范围(包含等于)
string rangeStart, rangeEnd;
if (descending)
{
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
rangeEnd = "-inf";
}
else
{
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
rangeEnd = "+inf";
}
try
{
var scriptResult = (object[])await Instance.EvalAsync(
luaScript,
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
new object[]
{
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
rangeStart,
rangeEnd,
pageSize,
lastScore?.ToString() ?? "",
lastMember ?? ""
});
var itemCount = (long)scriptResult[0];
if (itemCount == 0)
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
// 处理结果集
var members = ((object[])scriptResult[1]).Cast<string>().ToList();
var scores = ((object[])scriptResult[2]).Cast<string>()
.Select(decimal.Parse).ToList();
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
var validItems = members.AsParallel()
.Select((m, i) =>
{
try { return BusJsonSerializer.Deserialize<T>(hashData[i]); }
catch { return null; }
})
.Where(x => x != null)
.ToList();
// 精确分页控制
var hasNext = validItems.Count >= pageSize;
var actualItems = validItems.Take(pageSize).ToList();
// 计算下一页锚点(必须基于原始排序)
decimal? nextScore = null;
string nextMember = null;
if (hasNext && actualItems.Count > 0)
{
var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1);
nextScore = scores[lastValidIndex];
nextMember = members[lastValidIndex];
}
return new BusCacheGlobalPagedResult<T>
{
Items = actualItems,
HasNext = hasNext,
NextScore = nextScore,
NextMember = nextMember,
TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
PageSize = pageSize
};
}
catch (Exception ex)
{
_logger.LogError(ex, "分页查询异常");
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
}
}
/// <summary> /// <summary>
/// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。 /// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
/// </summary> /// </summary>
@ -562,17 +387,14 @@ namespace JiShe.CollectBus.RedisDataCache
where T : DeviceCacheBasicModel where T : DeviceCacheBasicModel
{ {
// 参数校验增强 // 参数校验增强
if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{ {
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101"); _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
return null; return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
} }
if (pageSize < 1 || pageSize > 10000) pageSize = Math.Clamp(pageSize, 1, 10000);
{
_logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间-102");
return null;
}
var luaScript = @" var luaScript = @"
local command = ARGV[1] local command = ARGV[1]

View File

@ -169,9 +169,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//await DeviceGroupBalanceControl.ProcessGenericListAsync( //await DeviceGroupBalanceControl.ProcessGenericListAsync(
// items: meterInfos, // items: meterInfos,
// deviceIdSelector: data => data.FocusAddress, // deviceIdSelector: data => data.FocusAddress,
// processor: (data, threadId) => // processor: (data, groupIndex) =>
// { // {
// _ = AmmerterCreatePublishTask(timeDensity, data); // _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
// } // }
//); //);
@ -180,9 +180,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await DeviceGroupBalanceControl.ProcessWithThrottleAsync( await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos, items: meterInfos,
deviceIdSelector: data => data.FocusAddress, deviceIdSelector: data => data.FocusAddress,
processor: (data,groupIndex) => processor: (data, groupIndex) =>
{ {
_ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
} }
); );
@ -282,7 +282,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
while (true) while (true)
{ {
var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>( var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp, redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp, redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000, pageSize: 1000,
@ -290,11 +290,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
lastMember: member); lastMember: member);
meterInfos.AddRange(page.Items); meterInfos.AddRange(page.Items);
focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress)); focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
foreach (var item in page.Items) foreach (var item in page.Items)
{ {
if (!allIds.Add(item.MemberId)) if (!allIds.Add(item.MemberId))
throw new Exception("Duplicate data found!"); {
_logger.LogError($"{item.MemberId}Duplicate data found!");
}
} }
if (!page.HasNext) break; if (!page.HasNext) break;
score = page.NextScore; score = page.NextScore;
@ -345,7 +347,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
focusAddressDataList.Add(item.Key); focusAddressDataList.Add(item.Key);
// var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG #if DEBUG
//每次缓存时,删除缓存,避免缓存数据有不准确的问题 //每次缓存时,删除缓存,避免缓存数据有不准确的问题
@ -409,7 +411,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>( await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
redisCacheMeterInfoHashKey, redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey, redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos); redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos);
//在缓存表信息数据的时候新增下一个时间的自动处理任务1分钟后执行所有的采集频率任务 //在缓存表信息数据的时候新增下一个时间的自动处理任务1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel() TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
@ -710,7 +712,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="taskBatch">时间格式的任务批次名称</param> /// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns> /// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity private async Task AmmerterCreatePublishTask(int timeDensity
, AmmeterInfo ammeterInfo,int groupIndex,string taskBatch) , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
{ {
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
@ -724,7 +726,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{ {
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return; return;
} }
@ -750,7 +752,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode)) if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{ {
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空"); // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
return; return;
} }
if (string.IsNullOrWhiteSpace(ammeterInfo.Address)) if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
@ -899,14 +901,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101");
return; return;
} }
await _redisDataCacheService.BatchInsertDataAsync(
redisCacheTelemetryPacketInfoHashKey, using (var pipe = FreeRedisProvider.Instance.StartPipe())
redisCacheTelemetryPacketInfoSetIndexKey, {
redisCacheTelemetryPacketInfoZSetScoresIndexKey, foreach (var item in taskList)
taskList); {
// 主数据存储Hash
pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize());
// Set索引缓存
pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId);
// ZSET索引缓存Key
pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId);
}
pipe.EndPipe();
}
//await _redisDataCacheService.BatchInsertDataAsync(
// redisCacheTelemetryPacketInfoHashKey,
// redisCacheTelemetryPacketInfoSetIndexKey,
// redisCacheTelemetryPacketInfoZSetScoresIndexKey,
// taskList);
} }
/// <summary> /// <summary>
@ -924,7 +943,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
await _producerService.ProduceAsync(topicName, partition, taskRecord); await _producerService.ProduceAsync(topicName, partition, taskRecord);
} }
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)

View File

@ -138,7 +138,7 @@ namespace JiShe.CollectBus.Kafka
int threadCount = 0; int threadCount = 0;
foreach (var sub in subscribedMethods) foreach (var sub in subscribedMethods)
{ {
int partitionCount = kafkaOptionConfig.NumPartitions; int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
//var adminClientService = provider.GetRequiredService<IAdminClientService>(); //var adminClientService = provider.GetRequiredService<IAdminClientService>();
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount; //int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0) if (partitionCount <= 0)