diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln
index ea5e278..9c67aae 100644
--- a/JiShe.CollectBus.sln
+++ b/JiShe.CollectBus.sln
@@ -39,8 +39,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Protocol.T
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Cassandra", "src\JiShe.CollectBus.Cassandra\JiShe.CollectBus.Cassandra.csproj", "{443B4549-0AC0-4493-8F3E-49C83225DD76}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka.Test", "src\JiShe.CollectBus.Kafka.Test\JiShe.CollectBus.Kafka.Test.csproj", "{FA762E8F-659A-DECF-83D6-5F364144450E}"
-EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -115,10 +113,6 @@ Global
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Debug|Any CPU.Build.0 = Debug|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.ActiveCfg = Release|Any CPU
{443B4549-0AC0-4493-8F3E-49C83225DD76}.Release|Any CPU.Build.0 = Release|Any CPU
- {FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {FA762E8F-659A-DECF-83D6-5F364144450E}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {FA762E8F-659A-DECF-83D6-5F364144450E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -141,7 +135,6 @@ Global
{A3F3C092-0A25-450B-BF6A-5983163CBEF5} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{A377955E-7EA1-6F29-8CF7-774569E93925} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
{443B4549-0AC0-4493-8F3E-49C83225DD76} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
- {FA762E8F-659A-DECF-83D6-5F364144450E} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD}
diff --git a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
index 5f0e8f8..eb400c7 100644
--- a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
+++ b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
@@ -132,18 +132,7 @@ namespace JiShe.CollectBus.Application.Contracts
bool descending = true)
where T : DeviceCacheBasicModel;
-
- ///
- /// 优化后的分页获取方法(支持百万级数据)
- ///
- Task> GetAllPagedDataOptimized(
- string redisHashCacheKey,
- string redisZSetScoresIndexCacheKey,
- int pageSize = 1000,
- decimal? lastScore = null,
- string lastMember = null,
- bool descending = true) where T : DeviceCacheBasicModel;
-
+
/////
///// 游标分页查询
/////
diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
index 0d9c80d..43dae23 100644
--- a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
+++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
@@ -365,182 +365,7 @@ namespace JiShe.CollectBus.RedisDataCache
{
throw new Exception();
}
-
-
- ///
- /// 优化后的分页获取方法(支持百万级数据)
- ///
- public async Task> GetAllPagedDataOptimized(
- string redisHashCacheKey,
- string redisZSetScoresIndexCacheKey,
- int pageSize = 1000,
- decimal? lastScore = null,
- string lastMember = null,
- bool descending = true) where T : DeviceCacheBasicModel
- {
- // 参数校验
- if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
- string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
- {
- _logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized));
- return new BusCacheGlobalPagedResult { Items = new List() };
- }
-
- pageSize = Math.Clamp(pageSize, 1, 10000);
-
- const string luaScript = @"
- local command = ARGV[1]
- local range_start = ARGV[2]
- local range_end = ARGV[3]
- local limit = tonumber(ARGV[4])
- local last_score = ARGV[5]
- local last_member = ARGV[6]
-
- -- 获取扩展数据(5倍分页大小)
- local members
- if command == 'ZRANGEBYSCORE' then
- members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end,
- 'WITHSCORES', 'LIMIT', 0, limit * 5)
- else
- members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end,
- 'WITHSCORES', 'LIMIT', 0, limit * 5)
- end
-
- -- 精确分页过滤
- local filtered = {}
- local count = 0
- local start_index = 1
-
- -- 存在锚点时寻找起始位置
- if last_score ~= '' and last_member ~= '' then
- for i=1,#members,2 do
- local score = members[i+1]
- local member = members[i]
-
- if command == 'ZRANGEBYSCORE' then
- if tonumber(score) > tonumber(last_score) then
- start_index = i
- break
- elseif tonumber(score) == tonumber(last_score) then
- if member > last_member then
- start_index = i
- break
- end
- end
- else
- if tonumber(score) < tonumber(last_score) then
- start_index = i
- break
- elseif tonumber(score) == tonumber(last_score) then
- if member < last_member then
- start_index = i
- break
- end
- end
- end
- end
- end
-
- -- 收集有效数据
- for i=start_index,#members,2 do
- if count >= limit then break end
- table.insert(filtered, members[i])
- table.insert(filtered, members[i+1])
- count = count + 1
- end
-
- -- 提取有效数据
- local result_members = {}
- local result_scores = {}
- for i=1,#filtered,2 do
- table.insert(result_members, filtered[i])
- table.insert(result_scores, filtered[i+1])
- end
-
- if #result_members == 0 then return {0,{},{},{}} end
-
- -- 获取Hash数据
- local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
- return {#result_members, result_members, result_scores, hash_data}";
-
- // 构造查询范围(包含等于)
- string rangeStart, rangeEnd;
- if (descending)
- {
- rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf";
- rangeEnd = "-inf";
- }
- else
- {
- rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf";
- rangeEnd = "+inf";
- }
-
- try
- {
- var scriptResult = (object[])await Instance.EvalAsync(
- luaScript,
- new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
- new object[]
- {
- descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
- rangeStart,
- rangeEnd,
- pageSize,
- lastScore?.ToString() ?? "",
- lastMember ?? ""
- });
-
- var itemCount = (long)scriptResult[0];
- if (itemCount == 0)
- return new BusCacheGlobalPagedResult { Items = new List() };
-
- // 处理结果集
- var members = ((object[])scriptResult[1]).Cast().ToList();
- var scores = ((object[])scriptResult[2]).Cast()
- .Select(decimal.Parse).ToList();
- var hashData = ((object[])scriptResult[3]).Cast().ToList();
-
- var validItems = members.AsParallel()
- .Select((m, i) =>
- {
- try { return BusJsonSerializer.Deserialize(hashData[i]); }
- catch { return null; }
- })
- .Where(x => x != null)
- .ToList();
-
- // 精确分页控制
- var hasNext = validItems.Count >= pageSize;
- var actualItems = validItems.Take(pageSize).ToList();
-
- // 计算下一页锚点(必须基于原始排序)
- decimal? nextScore = null;
- string nextMember = null;
- if (hasNext && actualItems.Count > 0)
- {
- var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1);
- nextScore = scores[lastValidIndex];
- nextMember = members[lastValidIndex];
- }
-
- return new BusCacheGlobalPagedResult
- {
- Items = actualItems,
- HasNext = hasNext,
- NextScore = nextScore,
- NextMember = nextMember,
- TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
- PageSize = pageSize
- };
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "分页查询异常");
- return new BusCacheGlobalPagedResult { Items = new List() };
- }
- }
-
+
///
/// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。
///
@@ -562,17 +387,14 @@ namespace JiShe.CollectBus.RedisDataCache
where T : DeviceCacheBasicModel
{
// 参数校验增强
- if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ if (string.IsNullOrWhiteSpace(redisHashCacheKey) ||
+ string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
- return null;
+ return new BusCacheGlobalPagedResult { Items = new List() };
}
- if (pageSize < 1 || pageSize > 10000)
- {
- _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102");
- return null;
- }
+ pageSize = Math.Clamp(pageSize, 1, 10000);
var luaScript = @"
local command = ARGV[1]
diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index 6eba059..169b3f8 100644
--- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -226,6 +226,19 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
+
+ List focusAddressDataLista = new List();
+ foreach (var item in meterInfos)
+ {
+ focusAddressDataLista.Add(item.FocusAddress);
+ }
+
+ DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
+
+ // 打印分布统计
+ DeviceGroupBalanceControl.PrintDistributionStats();
+
+ await Task.CompletedTask;
}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 9184a62..97bcc50 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -25,6 +25,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using static FreeSql.Internal.GlobalFilter;
@@ -47,7 +48,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService,
IRedisDataCacheService redisDataCacheService,
- IIoTDBProvider dbProvider,
+ IIoTDBProvider dbProvider,
IOptions kafkaOptions)
{
_logger = logger;
@@ -125,7 +126,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue;
}
-
+
var meterTypes = EnumExtensions.ToEnumDictionary();
if (meteryType == MeterTypeEnum.Ammeter.ToString())
@@ -155,7 +156,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
member = page.HasNext ? page.NextMember : null;
hasNext = page.HasNext;
} while (hasNext);
-
+
if (meterInfos == null || meterInfos.Count <= 0)
{
timer.Stop();
@@ -169,9 +170,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//await DeviceGroupBalanceControl.ProcessGenericListAsync(
// items: meterInfos,
// deviceIdSelector: data => data.FocusAddress,
- // processor: (data, threadId) =>
+ // processor: (data, groupIndex) =>
// {
- // _ = AmmerterCreatePublishTask(timeDensity, data);
+ // _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
// }
//);
@@ -180,9 +181,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
- processor: (data,groupIndex) =>
+ processor: (data, groupIndex) =>
{
- _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
+ AmmerterCreatePublishTask(timeDensity, data, groupIndex, tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss"));
}
);
@@ -247,9 +248,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// meterInfos.Add(tempData);
// //focusAddressDataLista.Add(item.FocusAddress);
//}
-
-
-
+
+
+
var timeDensity = "15";
var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
@@ -282,7 +283,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
while (true)
{
- var page = await _redisDataCacheService.GetAllPagedDataOptimized(
+ var page = await _redisDataCacheService.GetAllPagedData(
redisCacheMeterInfoHashKeyTemp,
redisCacheMeterInfoZSetScoresIndexKeyTemp,
pageSize: 1000,
@@ -290,22 +291,24 @@ namespace JiShe.CollectBus.ScheduledMeterReading
lastMember: member);
meterInfos.AddRange(page.Items);
- focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress));
+ focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
foreach (var item in page.Items)
{
if (!allIds.Add(item.MemberId))
- throw new Exception("Duplicate data found!");
+ {
+ _logger.LogError($"{item.MemberId}Duplicate data found!");
+ }
}
if (!page.HasNext) break;
score = page.NextScore;
member = page.NextMember;
}
-
+
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
- //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
- //return;
+ DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
+ return;
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
@@ -331,7 +334,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
- var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+ var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
List ammeterInfos = new List();
//将表计信息根据集中器分组,获得集中器号
@@ -345,7 +348,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
focusAddressDataList.Add(item.Key);
- // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
+ // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
@@ -409,7 +412,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _redisDataCacheService.BatchInsertDataAsync(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
- redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos);
+ redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos);
//在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
@@ -580,126 +583,104 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int timeDensity = 15;
var currentDateTime = DateTime.Now;
- var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
- var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
- {
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
- return;
- }
+ // 自动计算最佳并发度
+ int recommendedThreads = DeviceGroupBalanceControl.CalculateOptimalThreadCount();
- //获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
- if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
+ var options = new ParallelOptions
{
- _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
- return;
- }
-
- List meterTaskInfosList = new List();
-
- //将取出的缓存任务数据发送到Kafka消息队列中
- foreach (var focusItem in meterTaskInfos)
+ MaxDegreeOfParallelism = recommendedThreads,
+ };
+ string taskBatch = "20250417155016";
+ Parallel.For(0, _kafkaOptions.NumPartitions, options, async groupIndex =>
{
- foreach (var ammerterItem in focusItem.Value)
+ Console.WriteLine($"15分钟采集电表数据:{groupIndex}");
+ var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+ var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+ var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
+
+ List meterInfos = new List();
+ decimal? cursor = null;
+ string member = null;
+ bool hasNext;
+ do
{
- var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
- {
- MessageHexString = ammerterItem.Value.IssuedMessageHexString,
- MessageId = ammerterItem.Value.IssuedMessageId,
- FocusAddress = ammerterItem.Value.FocusAddress,
- TimeDensity = timeDensity.ToString(),
- };
- //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+ var page = await _redisDataCacheService.GetAllPagedData(
+ redisCacheTelemetryPacketInfoHashKey,
+ redisCacheTelemetryPacketInfoZSetScoresIndexKey,
+ pageSize: 1000,
+ lastScore: cursor,
+ lastMember: member);
- _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+ meterInfos.AddRange(page.Items);
+ cursor = page.HasNext ? page.NextScore : null;
+ member = page.HasNext ? page.NextMember : null;
+ hasNext = page.HasNext;
- //_ = _producerBus.Publish(tempMsg);
+ await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
+ items: meterInfos,
+ deviceIdSelector: data => data.FocusAddress,
+ processor: (data, groupIndex) =>
+ {
+ _= KafkaProducerIssuedMessage(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName,data, groupIndex);
+ }
+ );
- meterTaskInfosList.Add(ammerterItem.Value);
- }
- }
- if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
- {
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
- }
-
+ } while (hasNext);
+ });
- stopwatch.Stop();
+
- _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
- }
-
- ///
- /// 电表采集任务指令创建
- ///
- /// 采集频率1分钟、5分钟、15分钟
- /// 集中器数据分组
- ///
- private async Task AmmerterScheduledMeterReadingIssued(int timeDensity, Dictionary> focusGroup)
- {
- if (timeDensity <= 0)
- {
- timeDensity = 1;
- }
-
- if (timeDensity > 15)
- {
- timeDensity = 15;
- }
-
- if (focusGroup == null || focusGroup.Count <= 0)
- {
- _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
- return;
- }
- try
- {
- //将采集器编号的hash值取模分组
- const int TotalShards = 1024;
- var focusHashGroups = new Dictionary>>();
-
- foreach (var (collectorId, ammetersDictionary) in focusGroup)
- {
- if (string.IsNullOrWhiteSpace(collectorId))
- {
- _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败,无效Key -102");
- continue;
- }
-
- // 计算哈希分组ID
- int hashGroupId = Math.Abs(collectorId.GetHashCode() % TotalShards);
-
- // 获取或创建分组(避免重复查找)
- if (!focusHashGroups.TryGetValue(hashGroupId, out var group))
- {
- group = new Dictionary>();
- focusHashGroups[hashGroupId] = group;
- }
-
- // 将当前集中器数据加入分组
- group[collectorId] = ammetersDictionary;
- }
-
- if (focusHashGroups == null)
- {
- _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 集中器信息分组取模失败 -103");
- return;
- }
-
- //根据分组创建线程批处理集中器
- foreach (var group in focusHashGroups)
- {
- await AmmerterCreatePublishTask2(timeDensity, group.Value);
- }
- }
- catch (Exception)
- {
-
- throw;
- }
+ //var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
+ //var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
+ //if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
+ //{
+ // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-101");
+ // return;
+ //}
+
+ ////获取下发任务缓存数据
+ //Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter);
+ //if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
+ //{
+ // _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102");
+ // return;
+ //}
+
+ //List meterTaskInfosList = new List();
+
+ ////将取出的缓存任务数据发送到Kafka消息队列中
+ //foreach (var focusItem in meterTaskInfos)
+ //{
+ // foreach (var ammerterItem in focusItem.Value)
+ // {
+ // var tempMsg = new ScheduledMeterReadingIssuedEventMessage()
+ // {
+ // MessageHexString = ammerterItem.Value.IssuedMessageHexString,
+ // MessageId = ammerterItem.Value.IssuedMessageId,
+ // FocusAddress = ammerterItem.Value.FocusAddress,
+ // TimeDensity = timeDensity.ToString(),
+ // };
+ // //_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+
+ // _ = _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+
+ // //_ = _producerBus.Publish(tempMsg);
+
+ // meterTaskInfosList.Add(ammerterItem.Value);
+ // }
+ //}
+ //if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
+ //{
+ // await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
+ //}
+
+
+ //stopwatch.Stop();
+
+ //_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟采集电表数据处理完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
+
///
/// 电表创建发布任务
@@ -709,22 +690,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// 集中器所在分组
/// 时间格式的任务批次名称
///
- private async Task AmmerterCreatePublishTask(int timeDensity
- , AmmeterInfo ammeterInfo,int groupIndex,string taskBatch)
+ private void AmmerterCreatePublishTask(int timeDensity
+ , AmmeterInfo ammeterInfo, int groupIndex, string taskBatch)
{
var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
- //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
+
var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
- // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
+ // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
return;
}
@@ -750,7 +731,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeterInfo.AreaCode))
{
- // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
+ // _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeterInfo.ID},集中器通信区号为空");
return;
}
if (string.IsNullOrWhiteSpace(ammeterInfo.Address))
@@ -899,14 +880,31 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey))
{
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTask)} {ammeterInfo.Name}的写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101");
return;
}
- await _redisDataCacheService.BatchInsertDataAsync(
- redisCacheTelemetryPacketInfoHashKey,
- redisCacheTelemetryPacketInfoSetIndexKey,
- redisCacheTelemetryPacketInfoZSetScoresIndexKey,
- taskList);
+
+ using (var pipe = FreeRedisProvider.Instance.StartPipe())
+ {
+ foreach (var item in taskList)
+ {
+ // 主数据存储Hash
+ pipe.HSet(redisCacheTelemetryPacketInfoHashKey, item.MemberId, item.Serialize());
+
+ // Set索引缓存
+ pipe.SAdd(redisCacheTelemetryPacketInfoSetIndexKey, item.MemberId);
+
+ // ZSET索引缓存Key
+ pipe.ZAdd(redisCacheTelemetryPacketInfoZSetScoresIndexKey, item.ScoreValue, item.MemberId);
+ }
+ pipe.EndPipe();
+ }
+
+ //await _redisDataCacheService.BatchInsertDataAsync(
+ // redisCacheTelemetryPacketInfoHashKey,
+ // redisCacheTelemetryPacketInfoSetIndexKey,
+ // redisCacheTelemetryPacketInfoZSetScoresIndexKey,
+ // taskList);
}
///
@@ -914,17 +912,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
/// 主题名称
/// 任务记录
+ /// 对应分区,也就是集中器号所在的分组序号
///
private async Task KafkaProducerIssuedMessage(string topicName,
- MeterReadingRecords taskRecord)
+ MeterReadingTelemetryPacketInfo taskRecord,int partition)
{
if (string.IsNullOrWhiteSpace(topicName) || taskRecord == null)
{
throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101");
}
- int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress);
-
- await _producerService.ProduceAsync(topicName, partition, taskRecord);
+
+ await _producerService.ProduceAsync(topicName, partition, taskRecord);
}
private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType)
@@ -978,192 +976,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
}
-
- ///
- /// 电表创建发布任务
- ///
- /// 采集频率
- /// 集中器号hash分组的集中器集合数据
- ///
- private async Task AmmerterCreatePublishTask2(int timeDensity
- , Dictionary> focusGroup)
- {
- var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary;
- //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
-
- var currentTime = DateTime.Now;
- var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
- foreach (var focusInfo in focusGroup)
- {
- //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
- var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
-
- foreach (var ammeterInfo in focusInfo.Value)
- {
- var ammeter = ammeterInfo.Value;
-
- if (string.IsNullOrWhiteSpace(ammeter.ItemCodes))
- {
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,采集项为空,-101");
- continue;
- }
-
- //载波的不处理
- if (ammeter.MeteringPort == (int)MeterLinkProtocolEnum.Carrierwave)
- {
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}数据采集指令生成失败,载波不处理,-102");
- continue;
- }
-
- if (ammeter.State.Equals(2))
- {
- _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} {ammeter.Name} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}状态为禁用,不处理");
- continue;
- }
-
- ////排除1天未在线的集中器生成指令 或 排除集中器配置为自动上报的集中器
- //if (!IsGennerateCmd(ammeter.LastTime, -1))
- //{
- // _logger.LogInformation($"{nameof(CreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name},采集时间:{ammeter.LastTime},已超过1天未在线,不生成指令");
- // continue;
- //}
-
- if (string.IsNullOrWhiteSpace(ammeter.AreaCode))
- {
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空");
- continue;
- }
- if (string.IsNullOrWhiteSpace(ammeter.Address))
- {
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空");
- continue;
- }
- if (Convert.ToInt32(ammeter.Address) > 65535)
- {
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535");
- continue;
- }
- if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033)
- {
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})");
- continue;
- }
-
- List tempCodes = ammeter.ItemCodes.Deserialize>()!;
-
- //TODO:自动上报数据只主动采集1类数据。
- if (ammeter.AutomaticReport.Equals(1))
- {
- var tempSubCodes = new List();
- if (tempCodes.Contains("0C_49"))
- {
- tempSubCodes.Add("0C_49");
- }
-
- if (tempSubCodes.Contains("0C_149"))
- {
- tempSubCodes.Add("0C_149");
- }
-
- if (ammeter.ItemCodes.Contains("10_97"))
- {
- tempSubCodes.Add("10_97");
- }
-
- if (tempSubCodes == null || tempSubCodes.Count <= 0)
- {
- _logger.LogInformation($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}自动上报数据主动采集1类数据时数据类型为空");
- continue;
- }
- else
- {
- tempCodes = tempSubCodes;
- }
- }
-
- Dictionary keyValuePairs = new Dictionary();
-
- foreach (var tempItem in tempCodes)
- {
- //排除已发送日冻结和月冻结采集项配置
- if (DayFreezeCodes.Contains(tempItem))
- {
- continue;
- }
-
- if (MonthFreezeCodes.Contains(tempItem))
- {
- continue;
- }
-
- var itemCodeArr = tempItem.Split('_');
- var aFNStr = itemCodeArr[0];
- var aFN = (AFN)aFNStr.HexToDec();
- var fn = int.Parse(itemCodeArr[1]);
- byte[] dataInfos = null;
- if (ammeter.AutomaticReport.Equals(1) && aFN == AFN.请求实时数据)
- {
- //实时数据
- dataInfos = Build3761SendData.BuildAmmeterReadRealTimeDataSendCmd(ammeter.FocusAddress, ammeter.MeteringCode, (ATypeOfDataItems)fn);
- }
- else
- {
- string methonCode = $"AFN{aFNStr}_Fn_Send";
- //特殊表暂不处理
- if (handlerPacketBuilder != null && handlerPacketBuilder.TryGetValue(methonCode
- , out var handler))
- {
- dataInfos = handler(new TelemetryPacketRequest()
- {
- FocusAddress = ammeter.FocusAddress,
- Fn = fn,
- Pn = ammeter.MeteringCode
- });
- }
- else
- {
- _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}无效编码。");
- continue;
- }
- }
- //TODO:特殊表
-
- if (dataInfos == null || dataInfos.Length <= 0)
- {
- _logger.LogWarning($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeter.FocusAddress}的电表{ammeter.Name}采集项{tempItem}未能正确获取报文。");
- continue;
- }
-
-
-
- var meterReadingRecords = new MeterReadingRecords()
- {
- ProjectID = ammeter.ProjectID,
- DatabaseBusiID = ammeter.DatabaseBusiID,
- PendingCopyReadTime = pendingCopyReadTime,
- CreationTime = currentTime,
- MeterAddress = ammeter.AmmerterAddress,
- MeterId = ammeter.MeterId,
- MeterType = MeterTypeEnum.Ammeter,
- FocusAddress = ammeter.FocusAddress,
- FocusID = ammeter.FocusId,
- AFN = aFN,
- Fn = fn,
- ItemCode = tempItem,
- TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode),
- ManualOrNot = false,
- Pn = ammeter.MeteringCode,
- IssuedMessageId = GuidGenerator.Create().ToString(),
- IssuedMessageHexString = Convert.ToHexString(dataInfos),
- };
- //meterReadingRecords.CreateDataId(GuidGenerator.Create());
-
- keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
- }
- await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
- }
- }
- }
+
#endregion
@@ -1273,7 +1086,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
-
+
//await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs
index 06b7d70..aba63e8 100644
--- a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs
+++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs
@@ -230,7 +230,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
///
/// 自动计算最优线程数
///
- private static int CalculateOptimalThreadCount()
+ public static int CalculateOptimalThreadCount()
{
int coreCount = Environment.ProcessorCount;
return Math.Min(
@@ -418,6 +418,8 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据");
}
+
+ Console.WriteLine($"总共: {stats.Sum(d=>d.Count)} 条数据");
}
}
diff --git a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs
index c28c82c..067c1a9 100644
--- a/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs
+++ b/src/JiShe.CollectBus.KafkaProducer/KafkaSubcribesExtensions.cs
@@ -138,7 +138,7 @@ namespace JiShe.CollectBus.Kafka
int threadCount = 0;
foreach (var sub in subscribedMethods)
{
- int partitionCount = kafkaOptionConfig.NumPartitions;
+ int partitionCount = 3;// kafkaOptionConfig.NumPartitions;
//var adminClientService = provider.GetRequiredService();
//int partitionCount = sub.Attribute!.TaskCount==-1?adminClientService.GetTopicPartitionsNum(sub.Attribute!.Topic) : sub.Attribute!.TaskCount;
if (partitionCount <= 0)