完善时间控制

This commit is contained in:
ChenYi 2025-04-22 17:58:14 +08:00
parent 5ca4cbad13
commit 5647385582
7 changed files with 25 additions and 13 deletions

View File

@ -47,5 +47,10 @@
/// 时区,默认为:"UTC+08:00" /// 时区,默认为:"UTC+08:00"
/// </summary> /// </summary>
public string ZoneId { get; set; } = "UTC+08:00"; public string ZoneId { get; set; } = "UTC+08:00";
/// <summary>
/// 请求超时时间单位毫秒默认为50000
/// </summary>
public long Timeout { get; set; } = 50000;
} }
} }

View File

@ -214,6 +214,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
} }
catch (Exception ex) catch (Exception ex)
{ {
CurrentSession.Dispose();
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常"); _logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常");
throw; throw;
} }
@ -414,7 +415,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
private async Task<string> BuildQuerySQL<T>(IoTDBQueryOptions options) where T : IoTEntity private async Task<string> BuildQuerySQL<T>(IoTDBQueryOptions options) where T : IoTEntity
{ {
var metadata = await GetMetadata<T>(); var metadata = await GetMetadata<T>();
var sb = new StringBuilder("SELECT "); var sb = new StringBuilder("SELECT TIME,");
sb.AppendJoin(", ", metadata.ColumnNames); sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTreePath}"); sb.Append($" FROM {options.TableNameOrTreePath}");

View File

@ -70,7 +70,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql) public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{ {
return await _sessionPool.ExecuteQueryStatementAsync(sql); return await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
} }
public void Dispose() public void Dispose()

View File

@ -68,7 +68,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
/// <returns></returns> /// <returns></returns>
public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql) public async Task<SessionDataSet> ExecuteQueryStatementAsync(string sql)
{ {
return await _sessionPool.ExecuteQueryStatementAsync(sql); return await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
} }
public void Dispose() public void Dispose()

View File

@ -65,6 +65,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_producerService = producerService; _producerService = producerService;
_redisDataCacheService = redisDataCacheService; _redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value; _kafkaOptions = kafkaOptions.Value;
_runtimeContext.UseTableSessionPool = true;
} }
/// <summary> /// <summary>
@ -139,14 +141,16 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>(); var meterTypes = EnumExtensions.ToEnumDictionary<MeterTypeEnum>();
var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候NextTaskTime已经格式化到下一个采集点时间。
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
//List<MeterReadingTelemetryPacketInfo> pushTaskInfos = new(); //List<MeterReadingTelemetryPacketInfo> pushTaskInfos = new();
_runtimeContext.UseTableSessionPool = true; //_runtimeContext.UseTableSessionPool = true;
var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>(); var metadata = await _dbProvider.GetMetadata<MeterReadingTelemetryPacketInfo>();
_ = CreateMeterPublishTask<AmmeterInfo>( _ = CreateMeterPublishTask<AmmeterInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity), nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, timestamps) => taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
{ {
@ -166,7 +170,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_ = CreateMeterPublishTask<WatermeterInfo>( _ = CreateMeterPublishTask<WatermeterInfo>(
timeDensity: timeDensity, timeDensity: timeDensity,
nextTaskTime: tasksToBeIssueModel.NextTaskTime, nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter, meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
{ {
@ -183,8 +187,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
tasksToBeIssueModel.LastTaskTime = tasksToBeIssueModel.NextTaskTime; tasksToBeIssueModel.LastTaskTime = currentTaskTime;
tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity); tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
} }
} }
@ -248,8 +252,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timer1.Stop(); timer1.Stop();
_logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
//return; return;
#else #else
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif #endif
@ -460,7 +464,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息 //获取缓存中的电表信息
int timeDensity = 15; int timeDensity = 15;
//var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity); //var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity);
var currentTime = Convert.ToDateTime("2025-04-21 17:42:00").CalculateNextCollectionTime(timeDensity);
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity); var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey); var taskInfo = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(redisCacheKey);
@ -1031,6 +1034,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int pageNumber = 0; int pageNumber = 0;
bool hasNext; bool hasNext;
var stopwatch = Stopwatch.StartNew(); var stopwatch = Stopwatch.StartNew();
var ddd = _runtimeContext.UseTableSessionPool;
do do
{ {
options.PageIndex = pageNumber++; options.PageIndex = pageNumber++;

View File

@ -93,7 +93,7 @@
"ClusterList": [ "192.168.1.9:6667" ], "ClusterList": [ "192.168.1.9:6667" ],
"PoolSize": 2, "PoolSize": 2,
"DataBaseName": "energy", "DataBaseName": "energy",
"OpenDebugMode": false, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"Cassandra": { "Cassandra": {