diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index c335597..f0f7fbe 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -72,51 +72,75 @@ namespace JiShe.CollectBus.DataChannels /// public async Task ScheduledMeterTaskReadding(ChannelReader>> _telemetryPacketInfoReader) { - var metadata = await _dbProvider.GetMetadata(); - var batchSize = 10000; - var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒 - - List taskInfoList = new List(); - var startTime = DateTime.Now; - var timer = new Stopwatch(); - while (true) + try { - var canRead = await _telemetryPacketInfoReader.WaitToReadAsync(); - if (!canRead) - { - continue; - } + var metadata = await _dbProvider.GetMetadata(); + var batchSize = 200_00; + var timeout = TimeSpan.FromSeconds(5); // 默认超时时间为5秒 + var timer = Stopwatch.StartNew(); + long timeoutMilliseconds = 0; - while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout) + List taskInfoList = new List(); + while (true) { - if (_telemetryPacketInfoReader.TryRead(out var dataItem)) + var canRead = _telemetryPacketInfoReader.Count; + if (canRead <= 0) { - taskInfoList.AddRange(dataItem.Item2); - } - else - { - //无消息时短暂等待 - await Task.Delay(5); - } - } - - if (taskInfoList != null && taskInfoList.Count > 0) - { - await _dbProvider.BatchInsertAsync(metadata, taskInfoList); - - await DeviceGroupBalanceControl.ProcessWithThrottleAsync( - items: taskInfoList.ToList(), - deviceIdSelector: data => data.DeviceId, - processor: (data, groupIndex) => + if (timeoutMilliseconds > 0) { - // _ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex); + _logger.LogError($"{nameof(ScheduledMeterTaskReadding)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); } - ); + timeoutMilliseconds = 0; + //无消息时短等待1秒 + await Task.Delay(100_0); + continue; + } + timer.Restart(); - taskInfoList.Clear(); + var startTime = DateTime.Now; + + while (taskInfoList != null && taskInfoList.Count < batchSize && (DateTime.Now - startTime) < timeout) + { + try + { + if (_telemetryPacketInfoReader.TryRead(out var dataItem)) + { + taskInfoList.AddRange(dataItem.Item2); + } + } + catch (Exception ee) + { + + throw; + } + } + + if (taskInfoList != null && taskInfoList.Count > 0) + { + await _dbProvider.BatchInsertAsync(metadata, taskInfoList); + + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( + items: taskInfoList.ToList(), + deviceIdSelector: data => data.DeviceId, + processor: (data, groupIndex) => + { + // _ = KafkaProducerIssuedMessageAction(dateItem.Item1, data, groupIndex); + } + ); + + taskInfoList.Clear(); + } + timer.Stop(); + + timeoutMilliseconds = timeoutMilliseconds + timer.ElapsedMilliseconds; + + startTime = DateTime.Now; } - - startTime = DateTime.Now; + } + catch (Exception ex) + { + + throw; } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 6d5e01a..9740c2e 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -32,6 +32,7 @@ using static IdentityModel.ClaimComparer; using JiShe.CollectBus.DataChannels; using JiShe.CollectBus.DataMigration.Options; using static System.Runtime.InteropServices.JavaScript.JSType; +using static System.Formats.Asn1.AsnWriter; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -47,7 +48,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IProtocolService _protocolService; private readonly DataMigrationOptions _dataMigrationOptions; private readonly KafkaOptionConfig _kafkaOptions; - private readonly ServerApplicationOptions _applicationOptions; + private readonly ServerApplicationOptions _applicationOptions; int pageSize = 10000; @@ -69,7 +70,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _dataMigrationOptions = dataMigrationOptions.Value; _kafkaOptions = kafkaOptions.Value; - _applicationOptions = applicationOptions.Value; + _applicationOptions = applicationOptions.Value; } @@ -116,7 +117,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } var currentTime = DateTime.Now; - + //定时抄读 foreach (var item in taskInfos) @@ -169,7 +170,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading meterType: MeterTypeEnum.Ammeter, taskCreateAction: async (timeDensity, data, groupIndex, timestamps) => { - var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps); + var tempTask = await ConcentratorScheduledAutomaticGetTerminalVersion(timeDensity, data, groupIndex, timestamps); if (tempTask == null || tempTask.Count <= 0) { _logger.LogWarning($"集中器 {data.Name} 任务数据构建失败:{data.Serialize()}"); @@ -257,7 +258,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var tempTask = await AmmerterCreatePublishTaskAction(timeDensity, data, groupIndex, timestamps); if (tempTask == null || tempTask.Count <= 0) { - _logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); + //_logger.LogWarning($"电表 {data.Name} 任务数据构建失败:{data.Serialize()}"); return; } _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, tempTask)); @@ -298,14 +299,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //电表定时阀控任务处理。 - var autoValveControlTask = await AmmeterScheduledAutoValveControl(); + var autoValveControlTask = await AmmeterScheduledAutoValveControl(); if (autoValveControlTask == null || autoValveControlTask.Count <= 0) { _logger.LogWarning($"{nameof(AmmeterScheduledAutoValveControl)}电表定时阀控没有可操作的任务"); return; } - _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); + _ = _dataChannelManage.ScheduledMeterTaskWriter(DataChannelManage.TaskDataChannel.Writer, Tuple.Create(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, autoValveControlTask)); } #region 电表采集处理 @@ -332,9 +333,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading //此处代码不要删除 #if DEBUG var timeDensity = "15"; - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + var serverTagName = "JiSheCollectBus2"; + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; List meterInfos = new List(); List focusAddressDataLista = new List(); @@ -1499,8 +1501,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading List meterInfos = new List(); decimal? cursor = null; string member = null; - bool hasNext; - do + + while (true) { var page = await _redisDataCacheService.GetAllPagedData( redisCacheMeterInfoHashKeyTemp, @@ -1510,10 +1512,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading lastMember: member); meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + if (!page.HasNext) + { + break; + } + + cursor = page.NextScore; + member = page.NextMember; + } //var page = await _redisDataCacheService.GetAllPagedData( @@ -1530,8 +1536,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); return; } + timer.Stop(); + _logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒"); + timer.Restart(); + await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.MeterId.ToString(), diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 2a96f64..d652722 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -166,7 +166,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID INNER JOIN TB_FocusInfo(nolock) AS D ON D.ID=C.FocusID INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID - WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' and A.TripTime"; + WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' "; if (!string.IsNullOrWhiteSpace(currentTime)) { diff --git a/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index 4149afd..470efe7 100644 --- a/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs +++ b/shared/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -204,6 +204,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl int actualThreads = maxConcurrency ?? recommendedThreads; + // 创建节流器 using var throttler = new SemaphoreSlim(initialCount: actualThreads);