优化任务创建存入Redis

This commit is contained in:
ChenYi 2025-04-15 09:43:51 +08:00
parent 142f864544
commit 5afed96fb7
2 changed files with 49 additions and 14 deletions

View File

@ -1,4 +1,5 @@
using DeviceDetectorNET.Class.Client; using Confluent.Kafka;
using DeviceDetectorNET.Class.Client;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.BuildSendDatas;
@ -105,7 +106,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var tempArryay = item.Split(":"); var tempArryay = item.Split(":");
string meteryType = tempArryay[4];//表计类别 string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率 int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
if(timeDensity > 15) if (timeDensity > 15)
{ {
timeDensity = 15; timeDensity = 15;
} }
@ -132,7 +133,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
var timer = Stopwatch.StartNew();
// 解析结果(结果为嵌套数组) // 解析结果(结果为嵌套数组)
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
@ -141,17 +141,40 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return; return;
} }
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
var timer = Stopwatch.StartNew();
//处理数据 //处理数据
//await DeviceGroupBalanceControl.ProcessGenericListAsync( List<Tuple<string, Dictionary<string, MeterReadingRecords>>> tempDatas = new List<Tuple<string, Dictionary<string, MeterReadingRecords>>>();
// items: meterInfos, await DeviceGroupBalanceControl.ProcessGenericListAsync(
// deviceIdSelector: data => data.FocusAddress, items: meterInfos,
// processor: (data, threadId) => deviceIdSelector: data => data.FocusAddress,
// { processor: (data, threadId) =>
// _= AmmerterCreatePublishTask(timeDensity, data); {
_ = AmmerterCreatePublishTask(timeDensity, data);
//var keyValuePairs = AmmerterCreatePublishTask(timeDensity, data);
//if (keyValuePairs != null && keyValuePairs.Keys.Count() > 0)
//{
// //构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
// var redisDataCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{keyValuePairs.First().Value.FocusAddress}";
// tempDatas.Add(Tuple.Create(redisDataCacheKey, keyValuePairs));
// //tempDatas.Add(keyValuePairs);
//}
}
);
//_logger.LogError("数据处理完成。");
//using (var pipe = FreeRedisProvider.Instance.StartPipe())
//{
// _logger.LogError("开始进入管道处理。");
// foreach (var dataItem in tempDatas)
// {
// pipe.HSet(dataItem.Item1, dataItem.Item2);
// } // }
//); // object[] ret = pipe.EndPipe();
//}
//_logger.LogError("管道处理完成。");
timer.Stop(); timer.Stop();
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}");
@ -609,7 +632,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型 //构建缓存任务key依然 表计类型+采集频率+集中器地址存hash类型
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}"; var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 集中器{ammeterInfo.FocusAddress}的电表{ammeterInfo.Name}数据采集指令生成失败,采集项为空,-101");
@ -768,7 +791,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords);
} }
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan);
//return keyValuePairs;
// await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
using (var pipe = FreeRedisProvider.Instance.StartPipe())
{
pipe.HSet(redisCacheKey, keyValuePairs);
object[] ret = pipe.EndPipe();
}
} }
/// <summary> /// <summary>

View File

@ -150,13 +150,15 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl
MaxDegreeOfParallelism = maxThreads.Value, MaxDegreeOfParallelism = maxThreads.Value,
}; };
TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
await Task.Run(() => await Task.Run(() =>
{ {
Parallel.For(0, cache.CachedGroups.Length, options, groupId => Parallel.For(0, cache.CachedGroups.Length, options, async groupId =>
{ {
var queue = groupQueues[groupId]; var queue = groupQueues[groupId];
while (queue.TryDequeue(out T item)) while (queue.TryDequeue(out T item))
{ {
await Task.Delay(timeSpan);
processor(item, Thread.CurrentThread.ManagedThreadId); processor(item, Thread.CurrentThread.ManagedThreadId);
} }
}); });