Compare commits

..

No commits in common. "5890b16570a9d286e85adebe8684d2e666c8d35f" and "b0bbf67f870060d7d2439451b66acda4da7e1550" have entirely different histories.

14 changed files with 93 additions and 195 deletions

View File

@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.ComponentModel.DataAnnotations;
using System.Diagnostics;
using System.Reflection;
using System.Reflection.Metadata.Ecma335;
using System.Text;
@ -71,7 +70,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时发生异常");
_logger.LogError(ex, $"{nameof(InsertAsync)} 插入数据时发生异常");
throw;
}
}
@ -98,7 +97,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常");
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常");
throw;
}
}
@ -126,7 +125,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常");
_logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常");
throw;
}
}
@ -143,28 +142,21 @@ namespace JiShe.CollectBus.IoTDB.Provider
try
{
var query = await BuildDeleteSQL<T>(options);
var result = await CurrentSession.ExecuteQueryStatementAsync(query);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
if (result == null)
if (!sessionDataSet.HasNext())
{
return 0;
}
if (!result.HasNext())
{
_logger.LogWarning($"{typeof(T).Name} IoTDB删除{typeof(T).Name}的数据时,没有返回受影响记录数量。");
_logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。");
return 0;
}
//获取唯一结果行
var row = result.Next();
await result.Close();
var dataResult = row.Values[0];
return dataResult;
var row = sessionDataSet.Next();
return row.Values[0];
}
catch (Exception ex)
{
_logger.LogError(ex, $"{nameof(DeleteAsync)} IoTDB删除{typeof(T).Name}的数据时发生异常");
_logger.LogError(ex, $"{nameof(DeleteAsync)} 删除数据时发生异常");
throw;
}
}
@ -205,13 +197,10 @@ namespace JiShe.CollectBus.IoTDB.Provider
{
try
{
var stopwatch2 = Stopwatch.StartNew();
var query = await BuildQuerySQL<T>(options);
var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query);
_logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
var result = new BusPagedResult<T>
{
TotalCount = await GetTotalCount<T>(options),
@ -220,27 +209,15 @@ namespace JiShe.CollectBus.IoTDB.Provider
PageSize = options.PageSize,
};
stopwatch2.Stop();
_logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
//int totalPageCount = (int)Math.Ceiling((double)result.TotalCount / options.PageSize);
if (result.Items.Count() < result.PageSize)
{
result.HasNext = false;
}
else
{
result.HasNext = true;
}
//result.HasNext = result.Items.Count() > 0 ? result.Items.Count() < result.PageSize : false;
result.HasNext = result.TotalCount > 0 ? result.TotalCount < result.PageSize : false;
return result;
}
catch (Exception ex)
{
CurrentSession.Dispose();
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询{typeof(T).Name}的数据时发生异常");
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常");
throw;
}
}
@ -375,6 +352,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
devicePaths.Add(DevicePathBuilder.GetTableName<T>());
}
}
}
if (devicePaths.Count > 1)
@ -518,17 +496,12 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery);
if (result == null)
if (result.HasNext())
{
await result.Close();
return 0;
}
if (!result.HasNext())
{
return 0;
}
var count = Convert.ToInt32(result.Next().Values[0]);
await result.Close();

View File

@ -1,5 +1,4 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
@ -72,7 +71,7 @@ namespace JiShe.CollectBus.Kafka.Producer
QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB默认为1GB
CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4其他gzip/snappy/zstd
BatchSize = 32_768, // 修改批次大小为32K
LingerMs = 10, // 修改等待时间为20ms默认为5ms
LingerMs = 20, // 修改等待时间为20ms
Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader
MessageSendMaxRetries = 50, // 消息发送失败最大重试50次
MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs

View File

@ -20,11 +20,6 @@
/// </summary>
public required string ItemCode { get; set; }
/// <summary>
/// 任务时间戳
/// </summary>
public long TimeStamp { get; set; }
/// <summary>
/// 集中器转发协议构建构建参数
/// </summary>

View File

@ -30,11 +30,6 @@
/// </summary>
public int MSA { get; set; }
/// <summary>
/// 任务时间戳
/// </summary>
public long TimeStamp { get; set; }
/// <summary>
/// 报文体
/// </summary>

View File

@ -55,7 +55,6 @@ namespace JiShe.CollectBus.Protocol.Services
{
try
{
//todo 必须添加本地内存缓存然后是否需走个redis订阅
ProtocolInfo protocolInfo= await FirstOrDefaultByDeviceAsync(deviceCode, isSpecial);
if(protocolInfo==null)
return null;

View File

@ -90,48 +90,26 @@ namespace JiShe.CollectBus.Application.Contracts
string redisZSetScoresIndexCacheKey,
T newData) where T : DeviceCacheBasicModel;
/// <summary>
/// 通过集中器与表计信息排序索引获取数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
IEnumerable<int> focusIds,
int pageSize = 10,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel;
/// <summary>
/// 通过集中器与表计信息排序索引获取数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="scoreValueRawData">ZSET索引的原始数据例如集中地址和点位的组合</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
string scoreValueRawData,
int pageSize = 10,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel;
///// <summary>
///// 通过集中器与表计信息排序索引获取数据
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
///// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
///// <param name="pageSize">分页尺寸</param>
///// <param name="lastScore">最后一个索引</param>
///// <param name="lastMember">最后一个唯一标识</param>
///// <param name="descending">排序方式</param>
///// <returns></returns>
//Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
//string redisHashCacheKey,
//string redisZSetScoresIndexCacheKey,
//IEnumerable<int> focusIds,
//int pageSize = 10,
//decimal? lastScore = null,
//string lastMember = null,
//bool descending = true)
//where T : DeviceCacheBasicModel;
/// <summary>

View File

@ -368,39 +368,6 @@ namespace JiShe.CollectBus.RedisDataCache
throw new Exception();
}
/// <summary>
/// 通过集中器与表计信息排序索引获取数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="scoreValueRawData">ZSET索引的原始数据例如集中地址和点位的组合</param>
/// <param name="pageSize">分页尺寸</param>
/// <param name="lastScore">最后一个索引</param>
/// <param name="lastMember">最后一个唯一标识</param>
/// <param name="descending">排序方式</param>
/// <returns></returns>
public async Task<BusCacheGlobalPagedResult<T>> GetPagedData<T>(
string redisHashCacheKey,
string redisZSetScoresIndexCacheKey,
string scoreValueRawData,
int pageSize = 10,
decimal? lastScore = null,
string lastMember = null,
bool descending = true)
where T : DeviceCacheBasicModel
{
var rawDataArray = scoreValueRawData.Split(":");
string focusAddress = rawDataArray[0];
string point = rawDataArray[1];
long scoreValue = 0;
throw new Exception();
}
/// <summary>
/// 通过ZSET索引获取数据支持10万级别数据处理控制在13秒以内。
/// </summary>

View File

@ -27,7 +27,6 @@ using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using JiShe.CollectBus.Protocol.Models;
using System.Threading.Channels;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@ -45,7 +44,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly IProtocolService _protocolService;
int pageSize = 10000;
int pageSize = 3000;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
@ -200,8 +199,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
var currentTaskTime = tasksToBeIssueModel.LastTaskTime.CalculateNextCollectionTime(timeDensity);//程序启动缓存电表的时候NextTaskTime需要格式化到下一个采集点时间。
if (!IsTaskTime(currentTaskTime, timeDensity))
if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity))
{
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103");
continue;
@ -209,7 +207,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
//tasksToBeIssueModel.NextTaskTime;
var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候NextTaskTime已经格式化到下一个采集点时间。
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
if (meteryType == MeterTypeEnum.Ammeter.ToString())
@ -312,7 +310,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
lastMember: member);
meterInfos.AddRange(page.Items);
focusAddressDataLista.AddRange(page.Items.Select(d => $"{d.MeterId}"));
focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress));
foreach (var item in page.Items)
{
if (!allIds.Add(item.MemberId))
@ -350,20 +348,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
var currentTaskTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = currentTaskTime;
_applicationOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
LastTaskTime = null,
TimeDensity = item.Key,
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
@ -468,13 +467,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null)
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
@ -505,13 +504,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null)
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
@ -541,13 +540,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null)
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
@ -583,10 +582,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType);
if (protocolPlugin == null)
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 创建电表待发送的任务数据{currentTime}没有找到对应的协议组件,-105");
return null;
//_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105");
//return;
}
//todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿?
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
_logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
@ -705,7 +708,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
builderResponse: builderResponse,
itemCode: tempItem,
subItemCode: null,
pendingCopyReadTime: timestamps,
pendingCopyReadTime: currentTime,
creationTime: currentTime,
packetType: (TelemetryPacketTypeEnum)timeDensity);
taskList.Add(meterReadingRecords);
@ -1093,10 +1096,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
var currentTime = DateTime.Now;
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_applicationOptions.FirstCollectionTime = currentTime;
_applicationOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存
@ -1104,12 +1106,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),
LastTaskTime = null,
TimeDensity = item.Key,
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);//使用首次采集时间作为下一次采集时间
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
@ -1174,13 +1175,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
if (taskInfo == null)
if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败请检查Redis中是否有对应的任务下发信息");
return;
}
var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds();
var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds();
var conditions = new List<QueryCondition>();
conditions.Add(new QueryCondition()
@ -1549,11 +1550,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105");
return;
}
_logger.LogError($"{nameof(CreateMeterPublishTask)} 采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒");
await DeviceGroupBalanceControl.ProcessWithThrottleAsync(
items: meterInfos,
deviceIdSelector: data => data.MeterId.ToString(),
deviceIdSelector: data => data.FocusAddress,
processor: (data, groupIndex) =>
{
taskCreateAction(timeDensity, data, groupIndex, nextTaskTime);
@ -1561,7 +1561,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
);
timer.Stop();
_logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息");
_logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息");
}
@ -1579,21 +1579,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空-101");
return;
}
int pageNumber = 103;
int pageNumber = 0;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
do
{
var stopwatch2 = Stopwatch.StartNew();
options.PageIndex = pageNumber++;
var pageResult = await _dbProvider.QueryAsync<T>(options);
hasNext = pageResult.HasNext;
_ = DeviceGroupBalanceControl.ProcessWithThrottleAsync<T>(
await DeviceGroupBalanceControl.ProcessWithThrottleAsync<T>(
items: pageResult.Items.ToList(),
deviceIdSelector: data => data.DeviceId,
processor: (data, groupIndex) =>
@ -1601,12 +1599,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_ = KafkaProducerIssuedMessageAction(kafkaTopicName, data, groupIndex);
}
);
stopwatch2.Stop();
_logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。");
} while (hasNext);
stopwatch.Stop();
_logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
_logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。");
}
/// <summary>

View File

@ -67,7 +67,7 @@ namespace JiShe.CollectBus.Subscribers
/// </summary>
/// <param name="receivedMessage"></param>
/// <returns></returns>
//[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)]
public async Task<ISubscribeAck> AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage)
{
_logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理");

View File

@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@ -193,7 +192,6 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
int? maxConcurrency = null)
{
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
var timer = Stopwatch.StartNew();
// 自动计算最佳并发度
int recommendedThreads = CalculateOptimalThreadCount();
@ -227,8 +225,6 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
});
await Task.WhenAll(tasks);
timer.Stop();
Console.WriteLine($"任务处理完成,耗时:{timer.ElapsedMilliseconds}ms");
}
/// <summary>
@ -238,7 +234,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
int coreCount = Environment.ProcessorCount;
return Math.Min(
coreCount * 8, // 超线程优化
coreCount * 2, // 超线程优化
_currentCache?.CachedGroups.Length ?? 60
);
}

View File

@ -14,10 +14,10 @@ namespace JiShe.CollectBus.Common.Models
/// <summary>
/// 上次下发任务的时间
/// </summary>
public DateTime LastTaskTime { get; set; }
public DateTime? LastTaskTime { get; set; }
/// <summary>
/// 下个任务时间,用于构建待采集的 LastTaskTime 任务的时间
/// 下个任务时间
/// </summary>
public DateTime NextTaskTime { get; set; }

View File

@ -36,11 +36,10 @@ namespace JiShe.CollectBus.Host.HealthChecks
{
try
{
// todo 此处需要单独创建连接,并需要在连接打开以后立即关闭,否则会影响整个连接的使用。
//var ioTDbOptions = new IoTDbOptions();
//_configuration.GetSection("IoTDBOptions").Bind(ioTDbOptions);
//var pool = new SessionPoolAdapter(ioTDbOptions);
//await pool.OpenAsync();
var ioTDbOptions = new IoTDbOptions();
_configuration.GetSection("IoTDBOptions").Bind(ioTDbOptions);
var pool = new SessionPoolAdapter(ioTDbOptions);
await pool.OpenAsync();
return HealthCheckResult.Healthy($"IoTDB is healthy.");
}
catch (Exception ex)

View File

@ -5,7 +5,7 @@
"Serilog.Sinks.File"
],
"MinimumLevel": {
"Default": "Warning",
"Default": "Information",
"Override": {
"Microsoft": "Warning",
"Volo.Abp": "Warning",
@ -89,7 +89,7 @@
"ClusterList": [ "192.168.1.9:6667" ],
"PoolSize": 32,
"DataBaseName": "energy",
"OpenDebugMode": false,
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
},
"Cassandra": {