diff --git a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs index 0d01f81..251e48b 100644 --- a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs +++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs @@ -47,5 +47,10 @@ /// 时区,默认为:"UTC+08:00" /// public string ZoneId { get; set; } = "UTC+08:00"; + + /// + /// 请求超时时间,单位毫秒,默认为:50000 + /// + public long Timeout { get; set; } = 50000; } } diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 7cfaf32..4615053 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -214,6 +214,7 @@ namespace JiShe.CollectBus.IoTDB.Provider } catch (Exception ex) { + CurrentSession.Dispose(); _logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常"); throw; } @@ -414,7 +415,7 @@ namespace JiShe.CollectBus.IoTDB.Provider private async Task BuildQuerySQL(IoTDBQueryOptions options) where T : IoTEntity { var metadata = await GetMetadata(); - var sb = new StringBuilder("SELECT "); + var sb = new StringBuilder("SELECT TIME,"); sb.AppendJoin(", ", metadata.ColumnNames); sb.Append($" FROM {options.TableNameOrTreePath}"); diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs index eacf246..c8c36ee 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs @@ -70,7 +70,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task ExecuteQueryStatementAsync(string sql) { - return await _sessionPool.ExecuteQueryStatementAsync(sql); + return await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout); } public void Dispose() diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs index 137f5a8..a6e0f2a 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs @@ -68,7 +68,7 @@ namespace JiShe.CollectBus.IoTDB.Provider /// public async Task ExecuteQueryStatementAsync(string sql) { - return await _sessionPool.ExecuteQueryStatementAsync(sql); + return await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout); } public void Dispose() diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 3437f4b..40dfeba 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -65,6 +65,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading _producerService = producerService; _redisDataCacheService = redisDataCacheService; _kafkaOptions = kafkaOptions.Value; + + _runtimeContext.UseTableSessionPool = true; } /// @@ -138,15 +140,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading } var meterTypes = EnumExtensions.ToEnumDictionary(); + + var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候,NextTaskTime已经格式化到下一个采集点时间。 if (meteryType == MeterTypeEnum.Ammeter.ToString()) { //List pushTaskInfos = new(); - _runtimeContext.UseTableSessionPool = true; + //_runtimeContext.UseTableSessionPool = true; var metadata = await _dbProvider.GetMetadata(); _ = CreateMeterPublishTask( timeDensity: timeDensity, - nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity), + nextTaskTime: currentTaskTime, meterType: MeterTypeEnum.Ammeter, taskCreateAction: (timeDensity, data, groupIndex, timestamps) => { @@ -166,7 +170,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _ = CreateMeterPublishTask( timeDensity: timeDensity, - nextTaskTime: tasksToBeIssueModel.NextTaskTime, + nextTaskTime: currentTaskTime, meterType: MeterTypeEnum.Ammeter, taskCreateAction: (timeDensity, data, groupIndex, taskBatch) => { @@ -183,8 +187,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 - tasksToBeIssueModel.LastTaskTime = tasksToBeIssueModel.NextTaskTime; - tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity); + tasksToBeIssueModel.LastTaskTime = currentTaskTime; + tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); } } @@ -248,8 +252,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); _logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); - //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - //return; + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif @@ -460,7 +464,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取缓存中的电表信息 int timeDensity = 15; //var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity); - var currentTime = Convert.ToDateTime("2025-04-21 17:42:00").CalculateNextCollectionTime(timeDensity); var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity); var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); @@ -1031,6 +1034,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading int pageNumber = 0; bool hasNext; var stopwatch = Stopwatch.StartNew(); + + var ddd = _runtimeContext.UseTableSessionPool; + do { options.PageIndex = pageNumber++; diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index 88209c2..b25f9f0 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -17,7 +17,7 @@ 后端服务 - +