Merge branch 'dev' into zhy_feat_dev_v2

This commit is contained in:
zenghongyao 2025-04-17 18:09:21 +08:00
commit f11e514ef4
7 changed files with 164 additions and 532 deletions

View File

@ -39,8 +39,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -115,10 +113,6 @@ Global
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -141,7 +135,6 @@ Global
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{FA762E8F-659A-DECF-83D6-5F364144450E} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}

View File

@ -133,17 +133,6 @@ namespace JiShe.CollectBus.Application.Contracts
where T : DeviceCacheBasicModel;
/// <summary>
/// 优化后的分页获取方法(支持百万级数据)
/// </summary>
Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true) where T : DeviceCacheBasicModel;
///// <summary>
///// 游标分页查询
///// </summary>

View File

@ -366,181 +366,6 @@ namespace JiShe.CollectBus.RedisDataCache
throw new Exception();
}
/// <summary>
/// 优化后的分页获取方法(支持百万级数据)
/// </summary>
public async Task<BusCacheGlobalPagedResult<T>> GetAllPagedDataOptimized<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
int pageSize = 1000,
decimal? lastScore = null,
string lastMember = null,
bool descending = true) where T : DeviceCacheBasicModel
{
// 参数校验
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized));
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
}
pageSize = Math.Clamp(pageSize, 1, 10000);
const string luaScript = @"
local command = ARGV[1]
local range_start = ARGV[2]
local range_end = ARGV[3]
local limit = tonumber(ARGV[4])
local last_score = ARGV[5]
local last_member = ARGV[6]
-- 5
local members
if command == 'ZRANGEBYSCORE' then
members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
else
members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end,
'WITHSCORES', 'LIMIT', 0, limit * 5)
end
--
local filtered = {}
local count = 0
local start_index = 1
--
if last_score ~= '' and last_member ~= '' then
for i=1,#members,2 do
local score = members[i+1]
local member = members[i]
if command == 'ZRANGEBYSCORE' then
if tonumber(score) > tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member > last_member then
start_index = i
break
end
end
else
if tonumber(score) < tonumber(last_score) then
start_index = i
break
elseif tonumber(score) == tonumber(last_score) then
if member < last_member then
start_index = i
break
end
end
end
end
end
--
for i=start_index,#members,2 do
if count >= limit then break end
table.insert(filtered, members[i])
table.insert(filtered, members[i+1])
count = count + 1
end
--
local result_members = {}
local result_scores = {}
for i=1,#filtered,2 do
table.insert(result_members, filtered[i])
table.insert(result_scores, filtered[i+1])
end
if #result_members == 0 then return {0,{},{},{}} end
-- Hash数据
local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
return {#result_members, result_members, result_scores, hash_data}";
// 构造查询范围(包含等于)
string rangeStart, rangeEnd;
if (descending)
{
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
rangeEnd = "-inf";
}
else
{
rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
rangeEnd = "+inf";
}
try
{
var scriptResult = (object[])await Instance.EvalAsync(
luaScript,
new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
new object[]
{
descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
rangeStart,
rangeEnd,
pageSize,
lastScore?.ToString() ?? "",
lastMember ?? ""
});
var itemCount = (long)scriptResult[0];
if (itemCount == 0)
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
// 处理结果集
var members = ((object[])scriptResult[1]).Cast<string>().ToList();
var scores = ((object[])scriptResult[2]).Cast<string>()
.Select(decimal.Parse).ToList();
var hashData = ((object[])scriptResult[3]).Cast<string>().ToList();
var validItems = members.AsParallel()
.Select((m, i) =>
{
try { return BusJsonSerializer.Deserialize<T>(hashData[i]); }
catch { return null; }
})
.Where(x => x != null)
.ToList();
// 精确分页控制
var hasNext = validItems.Count >= pageSize;
var actualItems = validItems.Take(pageSize).ToList();
// 计算下一页锚点(必须基于原始排序)
decimal? nextScore = null;
string nextMember = null;
if (hasNext && actualItems.Count > 0)
{
var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1);
nextScore = scores[lastValidIndex];
nextMember = members[lastValidIndex];
}
return new BusCacheGlobalPagedResult<T>
{
Items = actualItems,
HasNext = hasNext,
NextScore = nextScore,
NextMember = nextMember,
TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
PageSize = pageSize
};
}
catch (Exception ex)
{
_logger.LogError(ex, "分页查询异常");
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
}
}
/// <summary>
/// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
/// </summary>
@ -562,17 +387,14 @@ namespace JiShe.CollectBus.RedisDataCache
where T : DeviceCacheBasicModel
{
// 参数校验增强
if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
return null;
return new BusCacheGlobalPagedResult<T> { Items = new List<T>() };
}
if (pageSize < 1 || pageSize > 10000)
{
_logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间-102");
return null;
}
pageSize = Math.Clamp(pageSize, 1, 10000);
var luaScript = @"
local command = ARGV[1]

View File

@ -226,6 +226,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
List<string> focusAddressDataLista = new List<string>();
foreach (var item in meterInfos)
{
focusAddressDataLista.Add(item.FocusAddress);
}
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
// 打印分布统计
DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}

View File

@ -25,6 +25,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter;
@ -169,9 +170,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
// items: meterInfos,
// deviceIdSelector: data => data.FocusAddress,
// processor: (data, threadId) =>
// processor: (data, groupIndex) =>
// {
// _ = AmmerterCreatePublishTask(timeDensity, data);
// _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
// }
//);
@ -180,9 +181,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data,groupIndex) =>
processor: (data, groupIndex) =>
{
_ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
}
);
@ -282,7 +283,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
while (true)
{
var page = await _redisDataCacheService.GetAllPagedDataOptimized<AmmeterInfo>(
var page = await _redisDataCacheService.GetAllPagedData<AmmeterInfo>(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
@ -290,11 +291,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
lastMember: member);
meterInfos.AddRange(page.Items);
focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress));
focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
foreach (var item in page.Items)
{
if (!allIds.Add(item.MemberId))
throw new Exception("Duplicate data found!");
{
_logger.LogError($"{item.MemberId}Duplicate data found!");
}
}
if (!page.HasNext) break;
score = page.NextScore;
@ -345,7 +348,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
focusAddressDataList.Add(item.Key);
// var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
// var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
@ -409,7 +412,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos);
redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos);
//在缓存表信息数据的时候新增下一个时间的自动处理任务1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
@ -580,125 +583,103 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int timeDensity = 15;
var currentDateTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
return;
}
// 自动计算最佳并发度
int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
//获取下发任务缓存数据
Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
var options = new ParallelOptions
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
return;
}
List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
//将取出的缓存任务数据发送到Kafka消息队列中
foreach (var focusItem in meterTaskInfos)
MaxDegreeOfParallelism = recommendedThreads,
};
string taskBatch = "20250417155016";
Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{
foreach (var ammerterItem in focusItem.Value)
Console.WriteLine($"15分钟采集电表数据:{groupIndex}");
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
List<MeterReadingTelemetryPacketInfo> meterInfos = new List<MeterReadingTelemetryPacketInfo>();
decimal? cursor = null;
string member = null;
bool hasNext;
do
{
var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
{
MessageHexString = ammerterItem.Value.IssuedMessageHexString,
MessageId = ammerterItem.Value.IssuedMessageId,
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
//_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
var page = await _redisDataCacheService.GetAllPagedData<MeterReadingTelemetryPacketInfo>(
redisCacheTelemetryPacketInfoHashKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
pageSize: 1000,
lastScore: cursor,
lastMember: member);
_ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
meterInfos.AddRange(page.Items);
cursor = page.HasNext ? page.NextScore : null;
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
//_ = _producerBus.Publish(tempMsg);
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
_= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex);
}
);
meterTaskInfosList.Add(ammerterItem.Value);
}
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
} while (hasNext);
});
stopwatch.Stop();
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
//var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
//var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
//if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
//{
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
// return;
//}
////获取下发任务缓存数据
//Dictionary<string, Dictionary<string, MeterReadingRecords>> meterTaskInfos = await GetMeterRedisCacheDictionaryData<MeterReadingRecords>(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
//if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
//{
// _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
// return;
//}
//List<MeterReadingRecords> meterTaskInfosList = new List<MeterReadingRecords>();
////将取出的缓存任务数据发送到Kafka消息队列中
//foreach (var focusItem in meterTaskInfos)
//{
// foreach (var ammerterItem in focusItem.Value)
// {
// var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
// {
// MessageHexString = ammerterItem.Value.IssuedMessageHexString,
// MessageId = ammerterItem.Value.IssuedMessageId,
// FocusAddress = ammerterItem.Value.FocusAddress,
// TimeDensity = timeDensity.ToString(),
// };
// //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
// _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
// //_ = _producerBus.Publish(tempMsg);
// meterTaskInfosList.Add(ammerterItem.Value);
// }
//}
//if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
//{
// await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
//}
//stopwatch.Stop();
//_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
/// <summary>
/// 电表采集任务指令创建
/// </summary>
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
/// <param name="focusGroup">集中器数据分组</param>
/// <returns></returns>
private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
{
if (timeDensity <= 0)
{
timeDensity = 1;
}
if (timeDensity > 15)
{
timeDensity = 15;
}
if (focusGroup == null || focusGroup.Count <= 0)
{
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
return;
}
try
{
//将采集器编号的hash值取模分组
const int TotalShards = 1024;
var focusHashGroups = new Dictionary<int, Dictionary<string, Dictionary<string, AmmeterInfo>>>();
foreach (var (collectorId, ammetersDictionary) in focusGroup)
{
if (string.IsNullOrWhiteSpace(collectorId))
{
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败无效Key -102");
continue;
}
// 计算哈希分组ID
int hashGroupId = Math.Abs(collectorId.GetHashCode() % TotalShards);
// 获取或创建分组(避免重复查找)
if (!focusHashGroups.TryGetValue(hashGroupId, out var group))
{
group = new Dictionary<string, Dictionary<string, AmmeterInfo>>();
focusHashGroups[hashGroupId] = group;
}
// 将当前集中器数据加入分组
group[collectorId] = ammetersDictionary;
}
if (focusHashGroups == null)
{
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败 -103");
return;
}
//根据分组创建线程批处理集中器
foreach (var group in focusHashGroups)
{
await AmmerterCreatePublishTask2(timeDensity, group.Value);
}
}
catch (Exception)
{
throw;
}
}
/// <summary>
@ -709,22 +690,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <param name="groupIndex">集中器所在分组</param>
/// <param name="taskBatch">时间格式的任务批次名称</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask(int timeDensity
, AmmeterInfo ammeterInfo,int groupIndex,string taskBatch)
private void AmmerterCreatePublishTask(int timeDensity
, AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return;
}
@ -750,7 +731,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
// _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
return;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
@ -899,14 +880,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101");
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}{redisCacheTelemetryPacketInfoSetIndexKey}{redisCacheTelemetryPacketInfoZSetScoresIndexKey}-101");
return;
}
await _redisDataCacheService.BatchInsertDataAsync(
redisCacheTelemetryPacketInfoHashKey,
redisCacheTelemetryPacketInfoSetIndexKey,
redisCacheTelemetryPacketInfoZSetScoresIndexKey,
taskList);
using (var pipe = FreeRedisProvider.Instance.StartPipe())
{
foreach (var item in taskList)
{
// 主数据存储Hash
pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize());
// Set索引缓存
pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId);
// ZSET索引缓存Key
pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId);
}
pipe.EndPipe();
}
//await _redisDataCacheService.BatchInsertDataAsync(
// redisCacheTelemetryPacketInfoHashKey,
// redisCacheTelemetryPacketInfoSetIndexKey,
// redisCacheTelemetryPacketInfoZSetScoresIndexKey,
// taskList);
}
/// <summary>
@ -914,17 +912,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary>
/// <param name="topicName">主题名称</param>
/// <param name="taskRecord">任务记录</param>
/// <param name="partition">对应分区,也就是集中器号所在的分组序号</param>
/// <returns></returns>
private async Task KafkaProducerIssuedMessage(string topicName,
MeterReadingRecords taskRecord)
MeterReadingTelemetryPacketInfo taskRecord,int partition)
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
}
int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
await _producerService.ProduceAsync(topicName, partition, taskRecord);
await _producerService.ProduceAsync(topicName, partition, taskRecord);
}
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
@ -979,191 +977,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
/// <summary>
/// 电表创建发布任务
/// </summary>
/// <param name="timeDensity">采集频率</param>
/// <param name="focusGroup">集中器号hash分组的集中器集合数据</param>
/// <returns></returns>
private async Task AmmerterCreatePublishTask2(int timeDensity
, Dictionary<string, Dictionary<string, AmmeterInfo>> focusGroup)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
foreach (var focusInfo in focusGroup)
{
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
foreach (var ammeterInfo in focusInfo.Value)
{
var ammeter = ammeterInfo.Value;
if (string.IsNullOrWhiteSpace(ammeter.ItemCodes))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,采集项为空,-101");
continue;
}
//载波的不处理
if (ammeter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,载波不处理,-102");
continue;
}
if (ammeter.State.Equals(2))
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeter.Name} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}状态为禁用,不处理");
continue;
}
////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
//if (!IsGennerateCmd(ammeter.LastTime, -1))
//{
// _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime}已超过1天未在线不生成指令");
// continue;
//}
if (string.IsNullOrWhiteSpace(ammeter.AreaCode))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空");
continue;
}
if (string.IsNullOrWhiteSpace(ammeter.Address))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空");
continue;
}
if (Convert.ToInt32(ammeter.Address) > 65535)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535");
continue;
}
if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})");
continue;
}
List<string> tempCodes = ammeter.ItemCodes.Deserialize<List<string>>()!;
//TODO:自动上报数据只主动采集1类数据。
if (ammeter.AutomaticReport.Equals(1))
{
var tempSubCodes = new List<string>();
if (tempCodes.Contains("0C_49"))
{
tempSubCodes.Add("0C_49");
}
if (tempSubCodes.Contains("0C_149"))
{
tempSubCodes.Add("0C_149");
}
if (ammeter.ItemCodes.Contains("10_97"))
{
tempSubCodes.Add("10_97");
}
if (tempSubCodes == null || tempSubCodes.Count <= 0)
{
_logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}自动上报数据主动采集1类数据时数据类型为空");
continue;
}
else
{
tempCodes = tempSubCodes;
}
}
Dictionary<string, MeterReadingRecords> keyValuePairs = new Dictionary<string, MeterReadingRecords>();
foreach (var tempItem in tempCodes)
{
//排除已发送日冻结和月冻结采集项配置
if (DayFreezeCodes.Contains(tempItem))
{
continue;
}
if (MonthFreezeCodes.Contains(tempItem))
{
continue;
}
var itemCodeArr = tempItem.Split('_');
var aFNStr = itemCodeArr[0];
var aFN = (AFN)aFNStr.HexToDec();
var fn = int.Parse(itemCodeArr[1]);
byte[] dataInfos = null;
if (ammeter.AutomaticReport.Equals(1) && aFN == AFN.)
{
//实时数据
dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeter.FocusAddress, ammeter.MeteringCode, (ATypeOfDataItems)fn);
}
else
{
string methonCode = $"AFN{aFNStr}_Fn_Send";
//特殊表暂不处理
if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
, out var handler))
{
dataInfos = handler(new TelemetryPacketRequest()
{
FocusAddress = ammeter.FocusAddress,
Fn = fn,
Pn = ammeter.MeteringCode
});
}
else
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}无效编码。");
continue;
}
}
//TODO:特殊表
if (dataInfos == null || dataInfos.Length <= 0)
{
_logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}未能正确获取报文。");
continue;
}
var meterReadingRecords = new MeterReadingRecords()
{
ProjectID = ammeter.ProjectID,
DatabaseBusiID = ammeter.DatabaseBusiID,
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime,
MeterAddress = ammeter.AmmerterAddress,
MeterId = ammeter.MeterId,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeter.FocusAddress,
FocusID = ammeter.FocusId,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode),
ManualOrNot = false,
Pn = ammeter.MeteringCode,
IssuedMessageId = GuidGenerator.Create().ToString(),
IssuedMessageHexString = Convert.ToHexString(dataInfos),
};
//meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
}
}
#endregion

View File

@ -230,7 +230,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
/// <summary>
/// 自动计算最优线程数
/// </summary>
private static int CalculateOptimalThreadCount()
public static int CalculateOptimalThreadCount()
{
int coreCount = Environment.ProcessorCount;
return Math.Min(
@ -418,6 +418,8 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据");
}
Console.WriteLine($"总共: {stats.Sum(d=>d.Count)} 条数据");
}
}

View File

@ -140,7 +140,7 @@ namespace JiShe.CollectBus.Kafka
int threadCount = 0;
foreach (var sub in subscribedMethods)
{
int partitionCount = kafkaOptionConfig.NumPartitions;
int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
//var adminClientService = provider.GetRequiredService<IAdminClientService>();
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)