dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
5 changed files with 156 additions and 85 deletions
Showing only changes of commit 142f864544 - Show all commits

View File

@ -9,6 +9,7 @@ using JiShe.CollectBus.Serializer;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.Application.Services; using Volo.Abp.Application.Services;
@ -38,57 +39,87 @@ public abstract class CollectBusAppService : ApplicationService
/// <returns></returns> /// <returns></returns>
protected async Task<Dictionary<string, Dictionary<string, T>>> GetMeterRedisCacheDictionaryData<T>(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class protected async Task<Dictionary<string, Dictionary<string, T>>> GetMeterRedisCacheDictionaryData<T>(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
{ {
if (redisKeys == null || redisKeys.Length <=0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity))
{ {
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101"); throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101");
} }
//通过lua脚本一次性获取所有缓存内容 var meterInfos = new Dictionary<string, Dictionary<string, T>>();
var luaScript = @" var luaScript = @"
local results = {} local results = {}
for i, key in ipairs(KEYS) do for i, key in ipairs(KEYS) do
local data = redis.call('HGETALL', key) local data = redis.call('HGETALL', key)
results[i] = {key, data} results[i] = {key, data}
end end
return results"; return results";
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS
if (merterResult == null)
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,没有获取到数据,-102");
}
// 解析结果(结果为嵌套数组) // 分页参数每页处理10000个键
var meterInfos = new Dictionary<string, Dictionary<string, T>>(); ; int pageSize = 10000;
if (merterResult is object[] arr) int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize);
for (int page = 0; page < totalPages; page++)
{ {
foreach (object[] item in arr) // 分页获取当前批次的键
var batchKeys = redisKeys
.Skip(page * pageSize)
.Take(pageSize)
.ToArray();
// 执行Lua脚本获取当前批次数据
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys);
if (merterResult == null)
{ {
string key = (string)item[0];//集中器地址对应的Redis缓存Key throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,第 {page + 1} 页数据未返回,-102");
object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 }
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}";
string focusAddress = key.Replace(redisCacheKey, "");//集中器地址
var meterHashs = new Dictionary<string, T>(); // 解析当前批次的结果
for (int i = 0; i < fieldsAndValues.Length; i += 2) if (merterResult is object[] arr)
{
foreach (object[] item in arr)
{ {
string meterld = (string)fieldsAndValues[i];//表ID string key = (string)item[0];
string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 object[] fieldsAndValues = (object[])item[1];
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}";
string focusAddress = key.Replace(redisCacheKey, "");
T meterInfo = default!; var meterHashs = new Dictionary<string, T>();
if (!string.IsNullOrWhiteSpace(meterStr)) for (int i = 0; i < fieldsAndValues.Length; i += 2)
{ {
meterInfo = meterStr.Deserialize<T>()!; string meterId = (string)fieldsAndValues[i];
string meterStr = (string)fieldsAndValues[i + 1];
T meterInfo = default!;
if (!string.IsNullOrWhiteSpace(meterStr))
{
meterInfo = meterStr.Deserialize<T>()!;
}
if (meterInfo != null)
{
meterHashs[meterId] = meterInfo;
}
else
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 缓存表计数据异常,集中器 {key} 的表计 {meterId} 解析失败,-103");
}
} }
if (meterInfo != null)
// 合并到总结果若存在重复key则覆盖
if (meterInfos.ContainsKey(focusAddress))
{ {
meterHashs[meterld] = meterInfo; foreach (var kvp in meterHashs)
{
meterInfos[focusAddress][kvp.Key] = kvp.Value;
}
} }
else else
{ {
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息集中器缓存{key}数据的{meterld}处理异常,-102"); meterInfos[focusAddress] = meterHashs;
} }
} }
meterInfos[focusAddress] = meterHashs; }
else
{
throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 第 {page + 1} 页数据解析失败,返回类型不符,-104");
} }
} }
@ -105,58 +136,87 @@ public abstract class CollectBusAppService : ApplicationService
/// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param> /// <param name="timeDensity">采集频率1分钟、5分钟、15分钟</param>
/// <param name="meterType"><see cref="MeterTypeEnum"/> 表计类型</param> /// <param name="meterType"><see cref="MeterTypeEnum"/> 表计类型</param>
/// <returns></returns> /// <returns></returns>
protected async Task<List<T>> GetMeterRedisCacheListData<T>(string[] redisKeys,string systemType,string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class protected async Task<List<T>> GetMeterRedisCacheListData<T>(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class
{ {
if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) if (redisKeys == null || redisKeys.Length <= 0 ||
string.IsNullOrWhiteSpace(systemType) ||
string.IsNullOrWhiteSpace(serverTagName) ||
string.IsNullOrWhiteSpace(timeDensity))
{ {
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息失败,参数异常,-101"); throw new Exception($"{nameof(GetMeterRedisCacheListData)} 参数异常,-101");
} }
//通过lua脚本一次性获取所有缓存内容 var meterInfos = new List<T>();
var luaScript = @" var luaScript = @"
local results = {} local results = {}
for i, key in ipairs(KEYS) do for i, key in ipairs(KEYS) do
local data = redis.call('HGETALL', key) local data = redis.call('HGETALL', key)
results[i] = {key, data} results[i] = {key, data}
end end
return results"; return results";
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS
if (merterResult == null)
{
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息失败,没有获取到数据,-102");
}
// 解析结果(结果为嵌套数组) // 分页参数每页10000个键
var meterInfos = new List<T>(); ; int pageSize = 10000;
if (merterResult is object[] arr) int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize);
for (int page = 0; page < totalPages; page++)
{ {
foreach (object[] item in arr) // 分页获取当前批次键
var batchKeys = redisKeys
.Skip(page * pageSize)
.Take(pageSize)
.ToArray();
// 执行Lua脚本获取当前页数据
var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys);
if (merterResult == null)
{ {
string key = (string)item[0];//集中器地址对应的Redis缓存Key throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据未返回,-102");
object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 }
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}";
string focusAddress = key.Replace(redisCacheKey, "");//集中器地址
for (int i = 0; i < fieldsAndValues.Length; i += 2) // 解析当前页结果
if (merterResult is object[] arr)
{
foreach (object[] item in arr)
{ {
string meterld = (string)fieldsAndValues[i];//表ID string key = (string)item[0];
string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 object[] fieldsAndValues = (object[])item[1];
var redisCacheKey = string.Format(
RedisConst.CacheMeterInfoKey,
systemType,
serverTagName,
meterType,
timeDensity
);
string focusAddress = key.Replace(redisCacheKey, "");
T meterInfo = default!; for (int i = 0; i < fieldsAndValues.Length; i += 2)
if (!string.IsNullOrWhiteSpace(meterStr))
{ {
meterInfo = meterStr.Deserialize<T>()!; string meterId = (string)fieldsAndValues[i];
} string meterStr = (string)fieldsAndValues[i + 1];
if (meterInfo != null)
{ T meterInfo = default!;
meterInfos.Add(meterInfo); if (!string.IsNullOrWhiteSpace(meterStr))
} {
else meterInfo = meterStr.Deserialize<T>()!;
{ }
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息集中器缓存{key}数据的{meterld}处理异常,-103"); if (meterInfo != null)
{
meterInfos.Add(meterInfo);
}
else
{
throw new Exception(
$"{nameof(GetMeterRedisCacheListData)} 表计 {meterId} 解析失败(页 {page + 1}-103"
);
}
} }
} }
} }
else
{
throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据格式错误,-104");
}
} }
return meterInfos; return meterInfos;

View File

@ -13,13 +13,13 @@ using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using JiShe.CollectBus.IoTDBProvider.Context; using JiShe.CollectBus.IoTDBProvider.Context;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.IotSystems.AFNEntity; using JiShe.CollectBus.IotSystems.AFNEntity;
using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using System.Diagnostics.Metrics; using System.Diagnostics.Metrics;
using JiShe.CollectBus.Common.DeviceBalanceControl;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;

View File

@ -103,8 +103,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>TaskInfotempArryay[3]=>表计类别tempArryay[4]=>采集频率 //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBustempArryay[1]=>SystemTypeConsttempArryay[2]=>TaskInfotempArryay[3]=>表计类别tempArryay[4]=>采集频率
var tempArryay = item.Split(":"); var tempArryay = item.Split(":");
string meteryType = tempArryay[3];//表计类别 string meteryType = tempArryay[4];//表计类别
int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率 int timeDensity = Convert.ToInt32(tempArryay[5]);//采集频率
if(timeDensity > 15)
{
timeDensity = 15;
}
//检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过 //检查任务时间节点由于定时任务10秒钟运行一次需要判定当前时间是否在任务时间节点内不在则跳过
if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity)) if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity))
@ -128,6 +132,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (meteryType == MeterTypeEnum.Ammeter.ToString()) if (meteryType == MeterTypeEnum.Ammeter.ToString())
{ {
var timer = Stopwatch.StartNew();
// 解析结果(结果为嵌套数组) // 解析结果(结果为嵌套数组)
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]);
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
@ -136,16 +141,21 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return; return;
} }
//await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos);
// 处理数据
await DeviceGroupBalanceControl.ProcessGenericListAsync( //处理数据
items: meterInfos, //await DeviceGroupBalanceControl.ProcessGenericListAsync(
deviceIdSelector: data => data.FocusAddress, // items: meterInfos,
processor: (data, threadId) => // deviceIdSelector: data => data.FocusAddress,
{ // processor: (data, threadId) =>
_= AmmerterCreatePublishTask(timeDensity, data); // {
} // _= AmmerterCreatePublishTask(timeDensity, data);
); // }
//);
timer.Stop();
_logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}");
} }
else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) else if (meteryType == MeterTypeEnum.WaterMeter.ToString())
{ {
@ -758,7 +768,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords);
} }
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
} }
/// <summary> /// <summary>

View File

@ -4,6 +4,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;

View File

@ -27,14 +27,14 @@ namespace JiShe.CollectBus.Workers
{ {
_logger = logger; _logger = logger;
RecurringJobId = nameof(CreateToBeIssueTaskWorker); RecurringJobId = nameof(CreateToBeIssueTaskWorker);
CronExpression = $"{10}/* * * * *"; CronExpression = $"*/{1} * * * *";
this._scheduledMeterReadingService = scheduledMeterReadingService; this._scheduledMeterReadingService = scheduledMeterReadingService;
} }
public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken())
{ {
await _scheduledMeterReadingService.CreateToBeIssueTasks(); // await _scheduledMeterReadingService.CreateToBeIssueTasks();
} }
} }
} }