diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 9c7c602..d867674 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.ComponentModel.DataAnnotations; +using System.Diagnostics; using System.Reflection; using System.Reflection.Metadata.Ecma335; using System.Text; @@ -70,7 +71,7 @@ namespace JiShe.CollectBus.IoTDB.Provider } catch (Exception ex) { - _logger.LogError(ex, $"{nameof(InsertAsync)} 插入数据时发生异常"); + _logger.LogError(ex, $"{nameof(InsertAsync)} IoTDB插入{typeof(T).Name}的数据时发生异常"); throw; } } @@ -97,7 +98,7 @@ namespace JiShe.CollectBus.IoTDB.Provider } catch (Exception ex) { - _logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常"); + _logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常"); throw; } } @@ -125,7 +126,7 @@ namespace JiShe.CollectBus.IoTDB.Provider } catch (Exception ex) { - _logger.LogError(ex, $"{nameof(BatchInsertAsync)} 批量插入数据时发生异常"); + _logger.LogError(ex, $"{nameof(BatchInsertAsync)} IoTDB批量插入{typeof(T).Name}的数据时发生异常"); throw; } } @@ -142,21 +143,28 @@ namespace JiShe.CollectBus.IoTDB.Provider try { var query = await BuildDeleteSQL(options); - var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); + var result = await CurrentSession.ExecuteQueryStatementAsync(query); - if (!sessionDataSet.HasNext()) + if (result == null) { - _logger.LogWarning($"{typeof(T).Name} 删除数据时,没有返回受影响记录数量。"); + return 0; + } + + if (!result.HasNext()) + { + _logger.LogWarning($"{typeof(T).Name} IoTDB删除{typeof(T).Name}的数据时,没有返回受影响记录数量。"); return 0; } //获取唯一结果行 - var row = sessionDataSet.Next(); - return row.Values[0]; + var row = result.Next(); + await result.Close(); + var dataResult = row.Values[0]; + return dataResult; } catch (Exception ex) { - _logger.LogError(ex, $"{nameof(DeleteAsync)} 删除数据时发生异常"); + _logger.LogError(ex, $"{nameof(DeleteAsync)} IoTDB删除{typeof(T).Name}的数据时发生异常"); throw; } } @@ -197,10 +205,13 @@ namespace JiShe.CollectBus.IoTDB.Provider { try { + var stopwatch2 = Stopwatch.StartNew(); + var query = await BuildQuerySQL(options); var sessionDataSet = await CurrentSession.ExecuteQueryStatementAsync(query); - + + _logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。"); var result = new BusPagedResult { TotalCount = await GetTotalCount(options), @@ -209,15 +220,27 @@ namespace JiShe.CollectBus.IoTDB.Provider PageSize = options.PageSize, }; + stopwatch2.Stop(); + _logger.LogWarning($"{nameof(QueryAsync)} 主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。"); + //int totalPageCount = (int)Math.Ceiling((double)result.TotalCount / options.PageSize); - result.HasNext = result.TotalCount > 0 ? result.TotalCount < result.PageSize : false; + if (result.Items.Count() < result.PageSize) + { + result.HasNext = false; + } + else + { + result.HasNext = true; + } + + //result.HasNext = result.Items.Count() > 0 ? result.Items.Count() < result.PageSize : false; return result; } catch (Exception ex) { CurrentSession.Dispose(); - _logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询数据时发生异常"); + _logger.LogError(ex, $"{nameof(QueryAsync)} IoTDB查询{typeof(T).Name}的数据时发生异常"); throw; } } @@ -352,7 +375,6 @@ namespace JiShe.CollectBus.IoTDB.Provider devicePaths.Add(DevicePathBuilder.GetTableName()); } } - } if (devicePaths.Count > 1) @@ -496,12 +518,17 @@ namespace JiShe.CollectBus.IoTDB.Provider } var result = await CurrentSession.ExecuteQueryStatementAsync(countQuery); - if (result.HasNext()) + if (result == null) { - await result.Close(); return 0; } + if (!result.HasNext()) + { + return 0; + } + + var count = Convert.ToInt32(result.Next().Values[0]); await result.Close(); diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index 50df423..18df86f 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -1,4 +1,5 @@ using System; +using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -71,12 +72,12 @@ namespace JiShe.CollectBus.Kafka.Producer QueueBufferingMaxKbytes = 2_097_151, // 修改缓冲区最大为2GB,默认为1GB CompressionType = CompressionType.Lz4, // 配置使用压缩算法LZ4,其他:gzip/snappy/zstd BatchSize = 32_768, // 修改批次大小为32K - LingerMs = 20, // 修改等待时间为20ms + LingerMs = 10, // 修改等待时间为20ms,默认为5ms Acks = Acks.All, // 表明只有所有副本Broker都收到消息才算提交成功, 可以 Acks.Leader MessageSendMaxRetries = 50, // 消息发送失败最大重试50次 MessageTimeoutMs = 120000, // 消息发送超时时间为2分钟,设置值MessageTimeoutMs > LingerMs }; - + if (_kafkaOptionConfig.EnableAuthorization) { config.SecurityProtocol = _kafkaOptionConfig.SecurityProtocol; diff --git a/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildRequest.cs b/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildRequest.cs index 9936dc6..1e0136a 100644 --- a/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildRequest.cs +++ b/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildRequest.cs @@ -20,6 +20,11 @@ /// public required string ItemCode { get; set; } + /// + /// 任务时间戳 + /// + public long TimeStamp { get; set; } + /// /// 集中器转发协议构建构建参数 /// diff --git a/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildResponse.cs b/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildResponse.cs index b371460..2819330 100644 --- a/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildResponse.cs +++ b/protocols/JiShe.CollectBus.Protocol/Models/ProtocolBuildResponse.cs @@ -30,6 +30,11 @@ /// public int MSA { get; set; } + /// + /// 任务时间戳 + /// + public long TimeStamp { get; set; } + /// /// 报文体 /// diff --git a/protocols/JiShe.CollectBus.Protocol/Services/ProtocolService.cs b/protocols/JiShe.CollectBus.Protocol/Services/ProtocolService.cs index 8a69307..d51b7be 100644 --- a/protocols/JiShe.CollectBus.Protocol/Services/ProtocolService.cs +++ b/protocols/JiShe.CollectBus.Protocol/Services/ProtocolService.cs @@ -55,6 +55,7 @@ namespace JiShe.CollectBus.Protocol.Services { try { + //todo 必须添加本地内存缓存,然后是否需走个redis订阅???? ProtocolInfo protocolInfo= await FirstOrDefaultByDeviceAsync(deviceCode, isSpecial); if(protocolInfo==null) return null; diff --git a/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs index eb400c7..dc7c14e 100644 --- a/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs @@ -90,26 +90,48 @@ namespace JiShe.CollectBus.Application.Contracts string redisZSetScoresIndexCacheKey, T newData) where T : DeviceCacheBasicModel; - ///// - ///// 通过集中器与表计信息排序索引获取数据 - ///// - ///// - ///// 主数据存储Hash缓存Key - ///// ZSET索引缓存Key - ///// 分页尺寸 - ///// 最后一个索引 - ///// 最后一个唯一标识 - ///// 排序方式 - ///// - //Task> GetPagedData( - //string redisHashCacheKey, - //string redisZSetScoresIndexCacheKey, - //IEnumerable focusIds, - //int pageSize = 10, - //decimal? lastScore = null, - //string lastMember = null, - //bool descending = true) - //where T : DeviceCacheBasicModel; + /// + /// 通过集中器与表计信息排序索引获取数据 + /// + /// + /// 主数据存储Hash缓存Key + /// ZSET索引缓存Key + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + Task> GetPagedData( + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + IEnumerable focusIds, + int pageSize = 10, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel; + + /// + /// 通过集中器与表计信息排序索引获取数据 + /// + /// + /// 主数据存储Hash缓存Key + /// ZSET索引缓存Key + /// ZSET索引的原始数据,例如集中地址和点位的组合 + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + Task> GetPagedData( + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + string scoreValueRawData, + int pageSize = 10, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel; /// diff --git a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index 625c1e6..e413527 100644 --- a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -367,7 +367,40 @@ namespace JiShe.CollectBus.RedisDataCache { throw new Exception(); } - + + + /// + /// 通过集中器与表计信息排序索引获取数据 + /// + /// + /// 主数据存储Hash缓存Key + /// ZSET索引缓存Key + /// ZSET索引的原始数据,例如集中地址和点位的组合 + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + public async Task> GetPagedData( + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + string scoreValueRawData, + int pageSize = 10, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel + { + var rawDataArray = scoreValueRawData.Split(":"); + string focusAddress = rawDataArray[0]; + string point = rawDataArray[1]; + + long scoreValue = 0; + + + throw new Exception(); + } + /// /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 /// diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 208e1f1..639a9c3 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -27,6 +27,7 @@ using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using JiShe.CollectBus.Protocol.Models; +using System.Threading.Channels; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -44,7 +45,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IoTDBRuntimeContext _runtimeContext; private readonly IProtocolService _protocolService; - int pageSize = 3000; + int pageSize = 10000; public BasicScheduledMeterReadingService( ILogger logger, @@ -199,7 +200,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过 - if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity)) + var currentTaskTime = tasksToBeIssueModel.LastTaskTime.CalculateNextCollectionTime(timeDensity);//程序启动缓存电表的时候,NextTaskTime需要格式化到下一个采集点时间。 + if (!IsTaskTime(currentTaskTime, timeDensity)) { _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); continue; @@ -207,7 +209,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var meterTypes = EnumExtensions.ToEnumDictionary(); - var currentTaskTime = tasksToBeIssueModel.NextTaskTime;//程序启动缓存电表的时候,NextTaskTime已经格式化到下一个采集点时间。 + //tasksToBeIssueModel.NextTaskTime; var metadata = await _dbProvider.GetMetadata(); if (meteryType == MeterTypeEnum.Ammeter.ToString()) @@ -310,7 +312,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading lastMember: member); meterInfos.AddRange(page.Items); - focusAddressDataLista.AddRange(page.Items.Select(d => d.FocusAddress)); + focusAddressDataLista.AddRange(page.Items.Select(d => $"{d.MeterId}")); foreach (var item in page.Items) { if (!allIds.Add(item.MemberId)) @@ -348,21 +350,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); - + var currentTaskTime = DateTime.Now; if (_applicationOptions.FirstCollectionTime.HasValue == false) { - _applicationOptions.FirstCollectionTime = DateTime.Now; + _applicationOptions.FirstCollectionTime = currentTaskTime; } //先处理采集频率任务缓存 foreach (var item in meterInfoGroupByTimeDensity) { TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { - LastTaskTime = null, + LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key), TimeDensity = item.Key, - NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 }; - + nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key); //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key); @@ -467,13 +468,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity); var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); - if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) + if (taskInfo == null) { _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); return; } - var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); + var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds(); var conditions = new List(); conditions.Add(new QueryCondition() @@ -504,13 +505,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity); var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); - if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) + if (taskInfo == null) { _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); return; } - var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); + var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds(); var conditions = new List(); conditions.Add(new QueryCondition() @@ -540,13 +541,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity); var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); - if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) + if (taskInfo == null) { _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); return; } - var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); + var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds(); var conditions = new List(); conditions.Add(new QueryCondition() @@ -582,14 +583,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType); if (protocolPlugin == null) { - //_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); - //return; + _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 创建电表待发送的任务数据{currentTime}没有找到对应的协议组件,-105"); + return null; } - - - //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? - if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { _logger.LogError($"{nameof(AmmerterCreatePublishTaskAction)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); @@ -708,7 +705,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading builderResponse: builderResponse, itemCode: tempItem, subItemCode: null, - pendingCopyReadTime: currentTime, + pendingCopyReadTime: timestamps, creationTime: currentTime, packetType: (TelemetryPacketTypeEnum)timeDensity); taskList.Add(meterReadingRecords); @@ -1001,7 +998,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var tryLock = FreeRedisProvider.Instance.Lock(retryReadingEnum.ToString(), 10); try { - if (tryLock != null) + if (tryLock != null) { // 轮询IotDB未成果下发电表数据 var conditions = new List(); @@ -1037,7 +1034,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IsNumber = false, Value = false }); - await CreateMeterKafkaTaskMessage(ProtocolConst.AmmeterSubscriberWorkerRetryEventName, new IoTDBQueryOptions() + await CreateMeterKafkaTaskMessage(ProtocolConst.AmmeterSubscriberWorkerRetryEventName, new IoTDBQueryOptions() { TableNameOrTreePath = DevicePathBuilder.GetTableName(), PageIndex = 1, @@ -1047,13 +1044,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading // 释放锁 tryLock.Unlock(); } - + } - catch(Exception) - { - // 释放锁 - tryLock.Unlock(); - throw; + catch (Exception) + { + // 释放锁 + tryLock.Unlock(); + throw; } } @@ -1096,9 +1093,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + var currentTime = DateTime.Now; if (_applicationOptions.FirstCollectionTime.HasValue == false) { - _applicationOptions.FirstCollectionTime = DateTime.Now; + _applicationOptions.FirstCollectionTime = currentTime; } //先处理采集频率任务缓存 @@ -1106,11 +1104,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading { TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { - LastTaskTime = null, - TimeDensity = item.Key, - NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key), + TimeDensity = item.Key, }; + nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key);//使用首次采集时间作为下一次采集时间 + //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key); @@ -1175,13 +1174,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, timeDensity); var taskInfo = await FreeRedisProvider.Instance.GetAsync(redisCacheKey); - if (taskInfo == null || taskInfo.LastTaskTime.HasValue == false) + if (taskInfo == null) { _logger.LogError($"{nameof(AmmeterScheduledMeterFifteenMinuteReading)} {timeDensity}分钟获取任务下发信息失败,请检查Redis中是否有对应的任务下发信息"); return; } - var pendingCopyReadTime = taskInfo.LastTaskTime.Value.GetDateTimeOffset().ToUnixTimeNanoseconds(); + var pendingCopyReadTime = taskInfo.LastTaskTime.GetDateTimeOffset().ToUnixTimeNanoseconds(); var conditions = new List(); conditions.Add(new QueryCondition() @@ -1286,7 +1285,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading Timestamps = DateTimeOffset.Now.ToUnixTimeNanoseconds(), DatabaseBusiID = watermeter.DatabaseBusiID, PacketType = (int)TelemetryPacketTypeEnum.WatermeterAutoReadding, - PendingCopyReadTime = timestamps, + PendingCopyReadTime = timestamps, CreationTime = currentTime, MeterAddress = watermeter.MeterAddress, AFN = builderResponse.AFn, @@ -1550,10 +1549,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); return; } + _logger.LogError($"{nameof(CreateMeterPublishTask)} 采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒"); await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, - deviceIdSelector: data => data.FocusAddress, + deviceIdSelector: data => data.MeterId.ToString(), processor: (data, groupIndex) => { taskCreateAction(timeDensity, data, groupIndex, nextTaskTime); @@ -1561,7 +1561,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading ); timer.Stop(); - _logger.LogInformation($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息"); + _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType} {timeDensity}分钟采集待下发任务创建完成,耗时{timer.ElapsedMilliseconds}毫秒,总共{meterInfos.Count}表计信息"); } @@ -1579,19 +1579,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} Kafka消息推送主题不能为空,-101"); return; } - int pageNumber = 0; + int pageNumber = 103; bool hasNext; var stopwatch = Stopwatch.StartNew(); do { + var stopwatch2 = Stopwatch.StartNew(); + options.PageIndex = pageNumber++; var pageResult = await _dbProvider.QueryAsync(options); hasNext = pageResult.HasNext; - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + _ = DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: pageResult.Items.ToList(), deviceIdSelector: data => data.DeviceId, processor: (data, groupIndex) => @@ -1599,11 +1601,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading _ = KafkaProducerIssuedMessageAction(kafkaTopicName, data, groupIndex); } ); - + stopwatch2.Stop(); + _logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径批次{options.PageIndex}任务数据读取完成,共消耗{stopwatch2.ElapsedMilliseconds}毫秒。"); } while (hasNext); stopwatch.Stop(); - _logger.LogInformation($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); + _logger.LogWarning($"{nameof(CreateMeterKafkaTaskMessage)} {kafkaTopicName}主题的任务 {options.TableNameOrTreePath} 路径任务推送完成,共消耗{stopwatch.ElapsedMilliseconds}毫秒。"); } /// @@ -1621,7 +1624,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading throw new Exception($"{nameof(KafkaProducerIssuedMessageAction)} 推送消息失败,参数异常,-101"); } - await _producerService.ProduceAsync(topicName, taskRecord, partition); + await _producerService.ProduceAsync(topicName, taskRecord, partition); } /// diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index ea2234e..dc49f57 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -67,7 +67,7 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] + //[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); diff --git a/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index aba63e8..1734f69 100644 --- a/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs +++ b/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -192,6 +193,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl int? maxConcurrency = null) { var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); + var timer = Stopwatch.StartNew(); // 自动计算最佳并发度 int recommendedThreads = CalculateOptimalThreadCount(); @@ -225,6 +227,8 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl }); await Task.WhenAll(tasks); + timer.Stop(); + Console.WriteLine($"任务处理完成,耗时:{timer.ElapsedMilliseconds}ms"); } /// @@ -234,7 +238,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl { int coreCount = Environment.ProcessorCount; return Math.Min( - coreCount * 2, // 超线程优化 + coreCount * 8, // 超线程优化 _currentCache?.CachedGroups.Length ?? 60 ); } diff --git a/shared/JiShe.CollectBus.Common/Models/TasksToBeIssueModel.cs b/shared/JiShe.CollectBus.Common/Models/TasksToBeIssueModel.cs index 1484ab8..a2a613b 100644 --- a/shared/JiShe.CollectBus.Common/Models/TasksToBeIssueModel.cs +++ b/shared/JiShe.CollectBus.Common/Models/TasksToBeIssueModel.cs @@ -14,10 +14,10 @@ namespace JiShe.CollectBus.Common.Models /// /// 上次下发任务的时间 /// - public DateTime? LastTaskTime { get; set; } + public DateTime LastTaskTime { get; set; } /// - /// 下个任务时间 + /// 下个任务时间,用于构建待采集的 LastTaskTime 任务的时间 /// public DateTime NextTaskTime { get; set; } diff --git a/web/JiShe.CollectBus.Host/HealthChecks/IoTDBHealthCheck.cs b/web/JiShe.CollectBus.Host/HealthChecks/IoTDBHealthCheck.cs index d3cb45f..a1cebf2 100644 --- a/web/JiShe.CollectBus.Host/HealthChecks/IoTDBHealthCheck.cs +++ b/web/JiShe.CollectBus.Host/HealthChecks/IoTDBHealthCheck.cs @@ -36,10 +36,11 @@ namespace JiShe.CollectBus.Host.HealthChecks { try { - var ioTDbOptions = new IoTDbOptions(); - _configuration.GetSection("IoTDBOptions").Bind(ioTDbOptions); - var pool = new SessionPoolAdapter(ioTDbOptions); - await pool.OpenAsync(); + // todo 此处需要单独创建连接,并需要在连接打开以后立即关闭,否则会影响整个连接的使用。 + //var ioTDbOptions = new IoTDbOptions(); + //_configuration.GetSection("IoTDBOptions").Bind(ioTDbOptions); + //var pool = new SessionPoolAdapter(ioTDbOptions); + //await pool.OpenAsync(); return HealthCheckResult.Healthy($"IoTDB is healthy."); } catch (Exception ex) diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index fa35bce..c76e0e5 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -17,7 +17,7 @@ 后端服务 - +