优化任务数据创建逻辑,改为上一个任务时间为待采集点

This commit is contained in:
ChenYi 2025-04-29 23:48:47 +08:00
parent d97e38439a
commit 52a7da69e9
14 changed files with 195 additions and 93 deletions

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.ComponentModel.DataAnnotations;
using System.Diagnostics;
using System.Reflection;
using System.Reflection.Metadata.Ecma335;
using System.Text;
@ -70,7 +71,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(InsertAsync)} 插入数据时发生异常");
_logger.LogError(ex, $"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时发生异常");
throw;
}
}
@ -97,7 +98,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常");
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常");
throw;
}
}
@ -125,7 +126,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常");
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常");
throw;
}
}
@ -142,21 +143,28 @@ namespace JiShe.CollectBus.IoTDB.Provider
try
{
var query = await BuildDeleteSQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
var result = await CurrentSession.ExecuteQueryStatementAsync(query);
if (!sessionDataSet.HasNext())
if (result == null)
{
_logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。");
return 0;
}
if (!result.HasNext())
{
_logger.LogWarning($"{typeof(T).Name} IoTDB删除{typeof(T).Name}的数据时,没有返回受影响记录数量。");
return 0;
}
//获取唯一结果行
var row = sessionDataSet.Next();
return row.Values[0];
var row = result.Next();
await result.Close();
var dataResult = row.Values[0];
return dataResult;
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(DeleteAsync)} 删除数据时发生异常");
_logger.LogError(ex, $"{nameof(DeleteAsync)} IoTDB删除{typeof(T).Name}的数据时发生异常");
throw;
}
}
@ -197,10 +205,13 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
try
{
var stopwatch2 = Stopwatch.StartNew();
var query = await BuildQuerySQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
_logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
var result = new BusPagedResult<T>
{
TotalCount = await GetTotalCount<T>(options),
@ -209,15 +220,27 @@ namespace JiShe.CollectBus.IoTDB.Provider
PageSize = options.PageSize,
};
stopwatch2.Stop();
_logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
//int totalPageCount = (int)Math.Ceiling((double)result.TotalCount / options.PageSize);
result.HasNext = result.TotalCount > 0 ? result.TotalCount < result.PageSize : false;
if (result.Items.Count() < result.PageSize)
{
result.HasNext = false;
}
else
{
result.HasNext = true;
}
//result.HasNext = result.Items.Count() > 0 ? result.Items.Count() < result.PageSize : false;
return result;
}
catch (Exception ex)
{
CurrentSession.Dispose();
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常");
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询{typeof(T).Name}的数据时发生异常");
throw;
}
}
@ -352,7 +375,6 @@ namespace JiShe.CollectBus.IoTDB.Provider
devicePaths.Add(DevicePathBuilder.GetTableName<T>());
}
}
}
if (devicePaths.Count > 1)
@ -496,12 +518,17 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
if (result.HasNext())
if (result == null)
{
await result.Close();
return 0;
}
if (!result.HasNext())
{
return 0;
}
var count = Convert.ToInt32(result.Next().Values[0]);
await result.Close();

View File

@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
@ -71,7 +72,7 @@ namespace JiShe.CollectBus.Kafka.Producer
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
BatchSize = 32_768, // 修改批次大小为32K
LingerMs = 20, // 修改等待时间为20ms
LingerMs = 10, // 修改等待时间为20ms默认为5ms
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs

View File

@ -20,6 +20,11 @@
/// </summary>
public required string ItemCode { get; set; }
/// <summary>
/// 任务时间戳
/// </summary>
public long TimeStamp { get; set; }
/// <summary>
/// 集中器转发协议构建构建参数
/// </summary>

View File

@ -30,6 +30,11 @@
/// </summary>
public int MSA { get; set; }
/// <summary>
/// 任务时间戳
/// </summary>
public long TimeStamp { get; set; }
/// <summary>
/// 报文体
/// </summary>

View File

@ -55,6 +55,7 @@ namespace JiShe.CollectBus.Protocol.Services
{
try
{
//todo 必须添加本地内存缓存然后是否需走个redis订阅
ProtocolInfo protocolInfo= await FirstOrDefaultByDeviceAsync(deviceCode, isSpecial);
if(protocolInfo==null)
return null;

View File

@ -90,26 +90,48 @@ namespace JiShe.CollectBus.Application.Contracts
string redisZSetScoresIndexCacheKey,
T newData) where T : DeviceCacheBasicModel;
///// <summary>
///// 通过集中器与表计信息排序索引获取数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
///// <param name="pageSize">分页尺寸</param>
///// <param name="lastScore">最后一个索引</param>
///// <param name="lastMember">最后一个唯一标识</param>
///// <param name="descending">排序方式</param>
///// <returns></returns>
//Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
//string redisHashCacheKey,
//string redisZSetScoresIndexCacheKey,
//IEnumerable<int> focusIds,
//int pageSize = 10,
//decimal? lastScore = null,
//string lastMember = null,
//bool descending = true)
//where T : DeviceCacheBasicModel;
/// <summary>
/// 通过集中器与表计信息排序索引获取数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
IEnumerable<int> focusIds,
int pageSize = 10,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel;
/// <summary>
/// 通过集中器与表计信息排序索引获取数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="scoreValueRawData">ZSET索引的原始数据例如集中地址和点位的组合</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
string scoreValueRawData,
int pageSize = 10,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel;
/// <summary>

View File

@ -368,6 +368,39 @@ namespace JiShe.CollectBus.RedisDataCache
throw new Exception();
}
/// <summary>
/// 通过集中器与表计信息排序索引获取数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="scoreValueRawData">ZSET索引的原始数据例如集中地址和点位的组合</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
public async Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
string scoreValueRawData,
int pageSize = 10,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel
{
var rawDataArray = scoreValueRawData.Split(":");
string focusAddress = rawDataArray[0];
string point = rawDataArray[1];
long scoreValue = 0;
throw new Exception();
}
/// <summary>
/// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
/// </summary>

View File

@ -27,6 +27,7 @@ using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using JiShe.CollectBus.Protocol.Models;
using System.Threading.Channels;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@ -44,7 +45,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly IProtocolService _protocolService;
int pageSize = 3000;
int pageSize = 10000;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
@ -199,7 +200,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity))
var currentTaskTime = tasksToBeIssueModel.LastTaskTime.CalculateNextCollectionTime(timeDensity);//程序启动缓存电表的时候NextTaskTime需要格式化到下一个采集点时间。
if (!IsTaskTime(currentTaskTime, timeDensity))
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue;
@ -207,7 +209,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候NextTaskTime已经格式化到下一个采集点时间。
//tasksToBeIssueModel.NextTaskTime;
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
if (meteryType == MeterTypeEnum.Ammeter.ToString())
@ -310,7 +312,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
lastMember: member);
meterInfos.AddRange(page.Items);
focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
focusAddressDataLista.AddRange(page.Items.Select(d => $"{d.MeterId}"));
foreach (var item in page.Items)
{
if (!allIds.Add(item.MemberId))
@ -348,21 +350,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
var currentTaskTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = DateTime.Now;
_applicationOptions.FirstCollectionTime = currentTaskTime;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = null,
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
TimeDensity = item.Key,
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
@ -467,13 +468,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
if (taskInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
@ -504,13 +505,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
if (taskInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
@ -540,13 +541,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
if (taskInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
@ -582,14 +583,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
//return;
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 创建电表待发送的任务数据{currentTime}没有找到对应的协议组件,-105");
return null;
}
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
@ -708,7 +705,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
builderResponse: builderResponse,
itemCode: tempItem,
subItemCode: null,
pendingCopyReadTime: currentTime,
pendingCopyReadTime: timestamps,
creationTime: currentTime,
packetType: (TelemetryPacketTypeEnum)timeDensity);
taskList.Add(meterReadingRecords);
@ -1001,7 +998,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var tryLock = FreeRedisProvider.Instance.Lock(retryReadingEnum.ToString(), 10);
try
{
if (tryLock != null)
if (tryLock != null)
{
// 轮询IotDB未成果下发电表数据
var conditions = new List<QueryCondition>();
@ -1037,7 +1034,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IsNumber = false,
Value = false
});
await CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerRetryEventName, new IoTDBQueryOptions()
await CreateMeterKafkaTaskMessage<MeterReadingTelemetryPacketInfo>(ProtocolConst.AmmeterSubscriberWorkerRetryEventName, new IoTDBQueryOptions()
{
TableNameOrTreePath = DevicePathBuilder.GetTableName<MeterReadingTelemetryPacketInfo>(),
PageIndex = 1,
@ -1049,11 +1046,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
catch(Exception)
catch (Exception)
{
// 释放锁
tryLock.Unlock();
throw;
// 释放锁
tryLock.Unlock();
throw;
}
}
@ -1096,9 +1093,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
var currentTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = DateTime.Now;
_applicationOptions.FirstCollectionTime = currentTime;
}
//先处理采集频率任务缓存
@ -1106,11 +1104,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = null,
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
TimeDensity = item.Key,
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);//使用首次采集时间作为下一次采集时间
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
@ -1175,13 +1174,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
if (taskInfo == null)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
@ -1286,7 +1285,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(),
DatabaseBusiID = watermeter.DatabaseBusiID,
PacketType = (int)TelemetryPacketTypeEnum.WatermeterAutoReadding,
PendingCopyReadTime = timestamps,
PendingCopyReadTime = timestamps,
CreationTime = currentTime,
MeterAddress = watermeter.MeterAddress,
AFN = builderResponse.AFn,
@ -1550,10 +1549,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
_logger.LogError($"{nameof(CreateMeterPublishTask)} 采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.FocusAddress,
deviceIdSelector: data => data.MeterId.ToString(),
processor: (data, groupIndex) =>
{
taskCreateAction(timeDensity, data, groupIndex, nextTaskTime);
@ -1561,7 +1561,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
);
timer.Stop();
_logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息");
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息");
}
@ -1579,19 +1579,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空-101");
return;
}
int pageNumber = 0;
int pageNumber = 103;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
do
{
var stopwatch2 = Stopwatch.StartNew();
options.PageIndex = pageNumber++;
var pageResult = await _dbProvider.QueryAsync<T>(options);
hasNext = pageResult.HasNext;
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<T>(
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<T>(
items: pageResult.Items.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
@ -1599,11 +1601,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_ = KafkaProducerIssuedMessageAction(kafkaTopicName, data, groupIndex);
}
);
stopwatch2.Stop();
_logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
} while (hasNext);
stopwatch.Stop();
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
_logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
/// <summary>
@ -1621,7 +1624,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101");
}
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
await _producerService.ProduceAsync<T>(topicName, taskRecord, partition);
}
/// <summary>

View File

@ -67,7 +67,7 @@ namespace JiShe.CollectBus.Subscribers
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
//[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");

View File

@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@ -192,6 +193,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
int? maxConcurrency = null)
{
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
var timer = Stopwatch.StartNew();
// 自动计算最佳并发度
int recommendedThreads = CalculateOptimalThreadCount();
@ -225,6 +227,8 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
});
await Task.WhenAll(tasks);
timer.Stop();
Console.WriteLine($"任务处理完成,耗时:{timer.ElapsedMilliseconds}ms");
}
/// <summary>
@ -234,7 +238,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
int coreCount = Environment.ProcessorCount;
return Math.Min(
coreCount * 2, // 超线程优化
coreCount * 8, // 超线程优化
_currentCache?.CachedGroups.Length ?? 60
);
}

View File

@ -14,10 +14,10 @@ namespace JiShe.CollectBus.Common.Models
/// <summary>
/// 上次下发任务的时间
/// </summary>
public DateTime? LastTaskTime { get; set; }
public DateTime LastTaskTime { get; set; }
/// <summary>
/// 下个任务时间
/// 下个任务时间,用于构建待采集的 LastTaskTime 任务的时间
/// </summary>
public DateTime NextTaskTime { get; set; }

View File

@ -36,10 +36,11 @@ namespace JiShe.CollectBus.Host.HealthChecks
{
try
{
var ioTDbOptions = new IoTDbOptions();
_configuration.GetSection("IoTDBOptions").Bind(ioTDbOptions);
var pool = new SessionPoolAdapter(ioTDbOptions);
await pool.OpenAsync();
// todo 此处需要单独创建连接,并需要在连接打开以后立即关闭,否则会影响整个连接的使用。
//var ioTDbOptions = new IoTDbOptions();
//_configuration.GetSection("IoTDBOptions").Bind(ioTDbOptions);
//var pool = new SessionPoolAdapter(ioTDbOptions);
//await pool.OpenAsync();
return HealthCheckResult.Healthy($"IoTDB is healthy.");
}
catch (Exception ex)

View File

@ -5,7 +5,7 @@
"Serilog.Sinks.File"
],
"MinimumLevel": {
"Default": "Information",
"Default": "Warning",
"Override": {
"Microsoft": "Warning",
"Volo.Abp": "Warning",
@ -89,7 +89,7 @@
"ClusterList": [ "192.168.1.9:6667" ],
"PoolSize": 32,
"DataBaseName": "energy",
"OpenDebugMode": true,
"OpenDebugMode": false,
"UseTableSessionPoolByDefault": false
},
"Cassandra": {