diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index df03914..914bfa1 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ -using DeviceDetectorNET.Class.Client; +using Confluent.Kafka; +using DeviceDetectorNET.Class.Client; using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.BuildSendDatas; @@ -105,7 +106,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var tempArryay = item.Split(":"); string meteryType = tempArryay[4];//表计类别 int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率 - if(timeDensity > 15) + if (timeDensity > 15) { timeDensity = 15; } @@ -132,7 +133,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - var timer = Stopwatch.StartNew(); // 解析结果(结果为嵌套数组) var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); if (meterInfos == null || meterInfos.Count <= 0) @@ -141,17 +141,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); - + + var timer = Stopwatch.StartNew(); //处理数据 - //await DeviceGroupBalanceControl.ProcessGenericListAsync( - // items: meterInfos, - // deviceIdSelector: data => data.FocusAddress, - // processor: (data, threadId) => - // { - // _= AmmerterCreatePublishTask(timeDensity, data); + List>> tempDatas = new List>>(); + await DeviceGroupBalanceControl.ProcessGenericListAsync( + items: meterInfos, + deviceIdSelector: data => data.FocusAddress, + processor: (data, threadId) => + { + _ = AmmerterCreatePublishTask(timeDensity, data); + //var keyValuePairs = AmmerterCreatePublishTask(timeDensity, data); + //if (keyValuePairs != null && keyValuePairs.Keys.Count() > 0) + //{ + // //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 + // var redisDataCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{keyValuePairs.First().Value.FocusAddress}"; + // tempDatas.Add(Tuple.Create(redisDataCacheKey, keyValuePairs)); + // //tempDatas.Add(keyValuePairs); + //} + } + ); + + //_logger.LogError("数据处理完成。"); + + //using (var pipe = FreeRedisProvider.Instance.StartPipe()) + //{ + // _logger.LogError("开始进入管道处理。"); + // foreach (var dataItem in tempDatas) + // { + // pipe.HSet(dataItem.Item1, dataItem.Item2); // } - //); + // object[] ret = pipe.EndPipe(); + //} + //_logger.LogError("管道处理完成。"); timer.Stop(); _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); @@ -609,7 +632,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}"; - + if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); @@ -768,7 +791,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); } - // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); + //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); + //await Task.Delay(timeSpan); + + //return keyValuePairs; + // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); + + using (var pipe = FreeRedisProvider.Instance.StartPipe()) + { + pipe.HSet(redisCacheKey, keyValuePairs); + object[] ret = pipe.EndPipe(); + } } /// diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index c7a5acd..e9ac9bc 100644 --- a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs +++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -150,13 +150,15 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl MaxDegreeOfParallelism = maxThreads.Value, }; + TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); await Task.Run(() => { - Parallel.For(0, cache.CachedGroups.Length, options, groupId => + Parallel.For(0, cache.CachedGroups.Length, options, async groupId => { var queue = groupQueues[groupId]; while (queue.TryDequeue(out T item)) { + await Task.Delay(timeSpan); processor(item, Thread.CurrentThread.ManagedThreadId); } });