新增设备缓存信息Redis数据结构

This commit is contained in:
ChenYi 2025-05-12 23:18:02 +08:00
parent fa84f42ca2
commit 705a5cbbf7
4 changed files with 103 additions and 7 deletions

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Ammeters;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -34,14 +35,31 @@ namespace JiShe.CollectBus.Application.Contracts
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param> /// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param> /// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param> /// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">设备缓存信息</param>
/// <param name="items">待缓存数据集合</param> /// <param name="items">待缓存数据集合</param>
/// <returns></returns> /// <returns></returns>
Task BatchInsertDataAsync<T>( Task BatchInsertDataAsync<T>(
string redisHashCacheKey, string redisHashCacheKey,
string redisSetIndexCacheKey, string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey, string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel; IEnumerable<T> items) where T : DeviceCacheBasicModel;
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">设备缓存信息</param>
/// <param name="items">待缓存数据集合</param>
/// <returns></returns>
Task BatchInsertDataAsync2<T>(
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
Dictionary<string, List<T>> items) where T : DeviceCacheBasicModel;
/// <summary> /// <summary>
/// 删除缓存信息 /// 删除缓存信息
/// </summary> /// </summary>

View File

@ -16,6 +16,7 @@ using Volo.Abp.DependencyInjection;
using static FreeSql.Internal.GlobalFilter; using static FreeSql.Internal.GlobalFilter;
using static System.Runtime.InteropServices.JavaScript.JSType; using static System.Runtime.InteropServices.JavaScript.JSType;
using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application; using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application;
using JiShe.CollectBus.IotSystems.Ammeters;
namespace JiShe.CollectBus.RedisDataCache namespace JiShe.CollectBus.RedisDataCache
{ {
@ -102,13 +103,16 @@ namespace JiShe.CollectBus.RedisDataCache
string redisHashCacheKey, string redisHashCacheKey,
string redisSetIndexCacheKey, string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey, string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel IEnumerable<T> items) where T : DeviceCacheBasicModel
{ {
if (items == null if (items == null
|| items.Count() <= 0 || items.Count() <= 0
|| string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey) || string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey)
)
{ {
_logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101"); _logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101");
return; return;
@ -131,10 +135,65 @@ namespace JiShe.CollectBus.RedisDataCache
pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize()); pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize());
// Set索引缓存 // Set索引缓存
pipe.SAdd(redisSetIndexCacheKey, item.MemberId); pipe.SAdd(redisSetIndexCacheKey, $"{item.TimeDensity.ToString().PadLeft(2, '0')}:{item.FocusAddress}");
// ZSET索引缓存Key // ZSET索引缓存Key
pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId); pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId);
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.FocusAddress, item.Serialize());
}
pipe.EndPipe();
}
semaphore.Release();
});
}
await Task.CompletedTask;
}
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="items">待缓存数据集合</param>
/// <returns></returns>
public async Task BatchInsertDataAsync2<T>(
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
Dictionary<string, List<T>> items) where T : DeviceCacheBasicModel
{
if (items == null
|| items.Count() <= 0
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey)
)
{
_logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101");
return;
}
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
foreach (var batch in items.Batch(BATCH_SIZE))
{
await semaphore.WaitAsync();
_ = Task.Run(() =>
{
using (var pipe = Instance.StartPipe())
{
foreach (var item in batch)
{
// Set索引缓存
pipe.SAdd(redisSetIndexCacheKey, $"{item.Value.First().TimeDensity.ToString().PadLeft(2, '0')}:{item.Value.First().FocusAddress}");
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, item.Value.Serialize());
} }
pipe.EndPipe(); pipe.EndPipe();
} }

View File

@ -322,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
return; //return;
// 创建取消令牌源 // 创建取消令牌源
//var cts = new CancellationTokenSource(); //var cts = new CancellationTokenSource();
@ -412,6 +412,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key); var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
} }
string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
Dictionary<string, List<AmmeterInfo>> keyValuePairs = new Dictionary<string, List<AmmeterInfo>>();
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{ {
@ -476,13 +478,29 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
ammeterInfos.Add(ammeter); ammeterInfos.Add(ammeter);
if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
{
keyValuePairs[ammeter.FocusAddress] = new List<AmmeterInfo>() {ammeter };
}
else
{
keyValuePairs[ammeter.FocusAddress].Add(ammeter);
}
} }
} }
//await _redisDataCacheService.BatchInsertDataAsync2<AmmeterInfo>(
// redisCacheMeterInfoSetIndexKey,
// redisDeviceInfoHashCacheKey,
// keyValuePairs);
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>( await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
redisCacheMeterInfoHashKey, redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey, redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos); redisCacheMeterInfoZSetScoresIndexKey,
redisDeviceInfoHashCacheKey,
ammeterInfos);
} }
//初始化设备组负载控制 //初始化设备组负载控制
@ -1126,7 +1144,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key); var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
} }
string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{ {
List<WatermeterInfo> watermeterInfo = new List<WatermeterInfo>(); List<WatermeterInfo> watermeterInfo = new List<WatermeterInfo>();
@ -1155,6 +1173,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _redisDataCacheService.BatchInsertDataAsync<WatermeterInfo>( await _redisDataCacheService.BatchInsertDataAsync<WatermeterInfo>(
redisCacheMeterInfoHashKey, redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey, redisCacheMeterInfoSetIndexKey,
redisDeviceInfoHashCacheKey,
redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo); redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo);
} }
} }

View File

@ -29,9 +29,9 @@ namespace JiShe.CollectBus.Common.Consts
public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen"; public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen";
/// <summary> /// <summary>
/// 集中器连接信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记 /// 设备信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记
/// </summary> /// </summary>
public const string ConcentratorCacheHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:Concentrator"; public const string DeviceInfoHashCacheKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceInfo";
public const string MeterInfo = "MeterInfo"; public const string MeterInfo = "MeterInfo";