From 5647385582ef155ec77841a3fc741b361fffe3e7 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Tue, 22 Apr 2025 17:58:14 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=97=B6=E9=97=B4=E6=8E=A7?=
=?UTF-8?q?=E5=88=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Options/IoTDBOptions.cs | 5 +++++
.../Provider/IoTDBProvider.cs | 3 ++-
.../Provider/SessionPoolAdapter.cs | 2 +-
.../Provider/TableSessionPoolAdapter.cs | 2 +-
.../BasicScheduledMeterReadingService.cs | 22 ++++++++++++-------
.../Pages/Monitor.cshtml | 2 +-
web/JiShe.CollectBus.Host/appsettings.json | 2 +-
7 files changed, 25 insertions(+), 13 deletions(-)
diff --git a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs
index 0d01f81..251e48b 100644
--- a/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Options/IoTDBOptions.cs
@@ -47,5 +47,10 @@
/// 时区,默认为:"UTC+08:00"
///
public string ZoneId { get; set; } = "UTC+08:00";
+
+ ///
+ /// 请求超时时间,单位毫秒,默认为:50000
+ ///
+ public long Timeout { get; set; } = 50000;
}
}
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
index 7cfaf32..4615053 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs
@@ -214,6 +214,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
}
catch (Exception ex)
{
+ CurrentSession.Dispose();
_logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常");
throw;
}
@@ -414,7 +415,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
private async Task BuildQuerySQL(IoTDBQueryOptions options) where T : IoTEntity
{
var metadata = await GetMetadata();
- var sb = new StringBuilder("SELECT ");
+ var sb = new StringBuilder("SELECT TIME,");
sb.AppendJoin(", ", metadata.ColumnNames);
sb.Append($" FROM {options.TableNameOrTreePath}");
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
index eacf246..c8c36ee 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/SessionPoolAdapter.cs
@@ -70,7 +70,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
public async Task ExecuteQueryStatementAsync(string sql)
{
- return await _sessionPool.ExecuteQueryStatementAsync(sql);
+ return await _sessionPool.ExecuteQueryStatementAsync(sql, _options.Timeout);
}
public void Dispose()
diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
index 137f5a8..a6e0f2a 100644
--- a/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
+++ b/modules/JiShe.CollectBus.IoTDB/Provider/TableSessionPoolAdapter.cs
@@ -68,7 +68,7 @@ namespace JiShe.CollectBus.IoTDB.Provider
///
public async Task ExecuteQueryStatementAsync(string sql)
{
- return await _sessionPool.ExecuteQueryStatementAsync(sql);
+ return await _sessionPool.ExecuteQueryStatementAsync(sql,_options.Timeout);
}
public void Dispose()
diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 3437f4b..40dfeba 100644
--- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -65,6 +65,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_producerService = producerService;
_redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value;
+
+ _runtimeContext.UseTableSessionPool = true;
}
///
@@ -138,15 +140,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
var meterTypes = EnumExtensions.ToEnumDictionary();
+
+ var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候,NextTaskTime已经格式化到下一个采集点时间。
if (meteryType == MeterTypeEnum.Ammeter.ToString())
{
//List pushTaskInfos = new();
- _runtimeContext.UseTableSessionPool = true;
+ //_runtimeContext.UseTableSessionPool = true;
var metadata = await _dbProvider.GetMetadata();
_ = CreateMeterPublishTask(
timeDensity: timeDensity,
- nextTaskTime: tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity),
+ nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, timestamps) =>
{
@@ -166,7 +170,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_ = CreateMeterPublishTask(
timeDensity: timeDensity,
- nextTaskTime: tasksToBeIssueModel.NextTaskTime,
+ nextTaskTime: currentTaskTime,
meterType: MeterTypeEnum.Ammeter,
taskCreateAction: (timeDensity, data, groupIndex, taskBatch) =>
{
@@ -183,8 +187,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。
- tasksToBeIssueModel.LastTaskTime = tasksToBeIssueModel.NextTaskTime;
- tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.CalculateNextCollectionTime(timeDensity);
+ tasksToBeIssueModel.LastTaskTime = currentTaskTime;
+ tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
}
@@ -248,8 +252,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timer1.Stop();
_logger.LogError($"读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒");
- //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
- //return;
+ DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
+ return;
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
@@ -460,7 +464,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息
int timeDensity = 15;
//var currentTime = DateTime.Now.CalculateNextCollectionTime(timeDensity);
- var currentTime = Convert.ToDateTime("2025-04-21 17:42:00").CalculateNextCollectionTime(timeDensity);
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName,MeterTypeEnum.Ammeter,timeDensity);
var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey);
@@ -1031,6 +1034,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int pageNumber = 0;
bool hasNext;
var stopwatch = Stopwatch.StartNew();
+
+ var ddd = _runtimeContext.UseTableSessionPool;
+
do
{
options.PageIndex = pageNumber++;
diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
index 88209c2..b25f9f0 100644
--- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
+++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml
@@ -17,7 +17,7 @@
后端服务
-
+