Compare commits

..

No commits in common. "13fd36c647d84ba86badadedad79f9a52010e854" and "5b4673adef9f8cd7a4f35b71c1f7f13c0247a0ce" have entirely different histories.

4 changed files with 7 additions and 103 deletions

View File

@ -1,5 +1,4 @@
using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IotSystems.Ammeters;
using System;
using System.Collections.Generic;
using System.Linq;
@ -35,31 +34,14 @@ namespace JiShe.CollectBus.Application.Contracts
/// <param name="redisHashCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisZSetScoresIndexCacheKey">ZSET索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">设备缓存信息</param>
/// <param name="items">待缓存数据集合</param>
/// <returns></returns>
Task BatchInsertDataAsync<T>(
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel;
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="redisDeviceInfoHashCacheKey">设备缓存信息</param>
/// <param name="items">待缓存数据集合</param>
/// <returns></returns>
Task BatchInsertDataAsync2<T>(
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
Dictionary<string, List<T>> items) where T : DeviceCacheBasicModel;
/// <summary>
/// 删除缓存信息
/// </summary>

View File

@ -16,7 +16,6 @@ using Volo.Abp.DependencyInjection;
using static FreeSql.Internal.GlobalFilter;
using static System.Runtime.InteropServices.JavaScript.JSType;
using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application;
using JiShe.CollectBus.IotSystems.Ammeters;
namespace JiShe.CollectBus.RedisDataCache
{
@ -103,16 +102,13 @@ namespace JiShe.CollectBus.RedisDataCache
string redisHashCacheKey,
string redisSetIndexCacheKey,
string redisZSetScoresIndexCacheKey,
string redisDeviceInfoHashCacheKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel
{
if (items == null
|| items.Count() <= 0
|| string.IsNullOrWhiteSpace(redisHashCacheKey)
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey)
)
|| string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
{
_logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101");
return;
@ -135,65 +131,10 @@ namespace JiShe.CollectBus.RedisDataCache
pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize());
// Set索引缓存
pipe.SAdd(redisSetIndexCacheKey, $"{item.TimeDensity.ToString().PadLeft(2, '0')}:{item.FocusAddress}");
pipe.SAdd(redisSetIndexCacheKey, item.MemberId);
// ZSET索引缓存Key
pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId);
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.FocusAddress, item.Serialize());
}
pipe.EndPipe();
}
semaphore.Release();
});
}
await Task.CompletedTask;
}
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisSetIndexCacheKey">Set索引缓存Key</param>
/// <param name="items">待缓存数据集合</param>
/// <returns></returns>
public async Task BatchInsertDataAsync2<T>(
string redisSetIndexCacheKey,
string redisDeviceInfoHashCacheKey,
Dictionary<string, List<T>> items) where T : DeviceCacheBasicModel
{
if (items == null
|| items.Count() <= 0
|| string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
|| string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey)
)
{
_logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101");
return;
}
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
foreach (var batch in items.Batch(BATCH_SIZE))
{
await semaphore.WaitAsync();
_ = Task.Run(() =>
{
using (var pipe = Instance.StartPipe())
{
foreach (var item in batch)
{
// Set索引缓存
pipe.SAdd(redisSetIndexCacheKey, $"{item.Value.First().TimeDensity.ToString().PadLeft(2, '0')}:{item.Value.First().FocusAddress}");
//设备信息缓存
pipe.HSet(redisDeviceInfoHashCacheKey, item.Key, item.Value.Serialize());
}
pipe.EndPipe();
}

View File

@ -322,7 +322,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
//return;
return;
// 创建取消令牌源
//var cts = new CancellationTokenSource();
@ -412,8 +412,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
Dictionary<string, List<AmmeterInfo>> keyValuePairs = new Dictionary<string, List<AmmeterInfo>>();
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
@ -478,29 +476,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
ammeterInfos.Add(ammeter);
if (!keyValuePairs.ContainsKey(ammeter.FocusAddress))
{
keyValuePairs[ammeter.FocusAddress] = new List<AmmeterInfo>() {ammeter };
}
else
{
keyValuePairs[ammeter.FocusAddress].Add(ammeter);
}
}
}
//await _redisDataCacheService.BatchInsertDataAsync2<AmmeterInfo>(
// redisCacheMeterInfoSetIndexKey,
// redisDeviceInfoHashCacheKey,
// keyValuePairs);
await _redisDataCacheService.BatchInsertDataAsync<AmmeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisCacheMeterInfoZSetScoresIndexKey,
redisDeviceInfoHashCacheKey,
ammeterInfos);
redisCacheMeterInfoZSetScoresIndexKey, ammeterInfos);
}
//初始化设备组负载控制
@ -1144,7 +1126,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}";
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
List<WatermeterInfo> watermeterInfo = new List<WatermeterInfo>();
@ -1173,7 +1155,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await _redisDataCacheService.BatchInsertDataAsync<WatermeterInfo>(
redisCacheMeterInfoHashKey,
redisCacheMeterInfoSetIndexKey,
redisDeviceInfoHashCacheKey,
redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo);
}
}

View File

@ -29,9 +29,9 @@ namespace JiShe.CollectBus.Common.Consts
public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen";
/// <summary>
/// 设备信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记
/// 集中器连接信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记
/// </summary>
public const string DeviceInfoHashCacheKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceInfo";
public const string ConcentratorCacheHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:Concentrator";
public const string MeterInfo = "MeterInfo";