优化Redis大批量数据添加

This commit is contained in:
ChenYi 2025-04-15 16:48:35 +08:00
parent 0037291350
commit abde2b5161
9 changed files with 178 additions and 228 deletions

View File

@ -207,26 +207,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// <returns></returns> /// <returns></returns>
public virtual async Task InitAmmeterCacheData(string gatherCode = "") public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{ {
#if DEBUG //#if DEBUG
var timeDensity = "15"; // var timeDensity = "15";
//获取缓存中的电表信息 // //获取缓存中的电表信息
var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*"; // var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); // var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); // var meterInfos = await GetMeterRedisCacheListData<AmmeterInfo>(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
List<string> focusAddressDataLista = new List<string>(); // List<string> focusAddressDataLista = new List<string>();
foreach (var item in meterInfos) // foreach (var item in meterInfos)
{ // {
focusAddressDataLista.Add(item.FocusAddress); // focusAddressDataLista.Add(item.FocusAddress);
} // }
// DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
// return;
//#else
// var meterInfos = await GetAmmeterInfoList(gatherCode);
//#endif
DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
return;
#else
var meterInfos = await GetAmmeterInfoList(gatherCode); var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
if (meterInfos == null || meterInfos.Count <= 0) if (meterInfos == null || meterInfos.Count <= 0)
{ {
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
@ -311,7 +311,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
} }
} }
keyValuePairs.TryAdd($"{ammeter.ID}", ammeter); keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter);
} }
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
} }
@ -757,10 +757,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
PendingCopyReadTime = pendingCopyReadTime, PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime, CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress, MeterAddress = ammeterInfo.AmmerterAddress,
MeterId = ammeterInfo.ID, MeterId = ammeterInfo.MeterId,
MeterType = MeterTypeEnum.Ammeter, MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress, FocusAddress = ammeterInfo.FocusAddress,
FocusID = ammeterInfo.FocusID, FocusID = ammeterInfo.FocusId,
AFN = aFN, AFN = aFN,
Fn = fn, Fn = fn,
ItemCode = tempItem, ItemCode = tempItem,
@ -772,7 +772,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}; };
meterReadingRecords.CreateDataId(GuidGenerator.Create()); meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); keyValuePairs.TryAdd($"{ammeterInfo.MeterId}_{tempItem}", meterReadingRecords);
} }
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan); //await Task.Delay(timeSpan);
@ -910,22 +910,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeter.AreaCode)) if (string.IsNullOrWhiteSpace(ammeter.AreaCode))
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信区号为空"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空");
continue; continue;
} }
if (string.IsNullOrWhiteSpace(ammeter.Address)) if (string.IsNullOrWhiteSpace(ammeter.Address))
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址为空"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空");
continue; continue;
} }
if (Convert.ToInt32(ammeter.Address) > 65535) if (Convert.ToInt32(ammeter.Address) > 65535)
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址无效,确保大于65535"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535");
continue; continue;
} }
if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033) if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033)
{ {
_logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},非有效测量点号({ammeter.MeteringCode})"); _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})");
continue; continue;
} }
@ -1023,10 +1023,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
PendingCopyReadTime = pendingCopyReadTime, PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime, CreationTime = currentTime,
MeterAddress = ammeter.AmmerterAddress, MeterAddress = ammeter.AmmerterAddress,
MeterId = ammeter.ID, MeterId = ammeter.MeterId,
MeterType = MeterTypeEnum.Ammeter, MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeter.FocusAddress, FocusAddress = ammeter.FocusAddress,
FocusID = ammeter.FocusID, FocusID = ammeter.FocusId,
AFN = aFN, AFN = aFN,
Fn = fn, Fn = fn,
ItemCode = tempItem, ItemCode = tempItem,
@ -1038,7 +1038,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}; };
meterReadingRecords.CreateDataId(GuidGenerator.Create()); meterReadingRecords.CreateDataId(GuidGenerator.Create());
keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords); keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
} }
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
} }
@ -1097,7 +1097,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var subItem in item) foreach (var subItem in item)
{ {
keyValuePairs.TryAdd($"{subItem.ID}", subItem); keyValuePairs.TryAdd($"{subItem.MeterId}", subItem);
} }
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
} }

View File

@ -80,11 +80,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// Baudrate = 2400, // Baudrate = 2400,
// FocusAddress = "402440506", // FocusAddress = "402440506",
// Name = "张家祠工务(三相电表)", // Name = "张家祠工务(三相电表)",
// FocusID = 95780, // FocusId = 95780,
// DatabaseBusiID = 1, // DatabaseBusiID = 1,
// MeteringCode = 1, // MeteringCode = 1,
// AmmerterAddress = "402410040506", // AmmerterAddress = "402410040506",
// ID = 127035, // MeterId = 127035,
// TypeName = 3, // TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15, // TimeDensity = 15,
@ -94,11 +94,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// Baudrate = 2400, // Baudrate = 2400,
// FocusAddress = "542400504", // FocusAddress = "542400504",
// Name = "五号配(长芦二所四排)(单相电表)", // Name = "五号配(长芦二所四排)(单相电表)",
// FocusID = 69280, // FocusId = 69280,
// DatabaseBusiID = 1, // DatabaseBusiID = 1,
// MeteringCode = 2, // MeteringCode = 2,
// AmmerterAddress = "542410000504", // AmmerterAddress = "542410000504",
// ID = 95594, // MeterId = 95594,
// TypeName = 1, // TypeName = 1,
// DataTypes = "581,589,592,597,601", // DataTypes = "581,589,592,597,601",
// TimeDensity = 15, // TimeDensity = 15,
@ -106,7 +106,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//return ammeterInfos; //return ammeterInfos;
string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID string sql = $@"SELECT C.ID as MeterId,C.Name,C.FocusID as FocusId,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0 INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100 INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
@ -114,10 +114,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
WHERE 1=1 and C.Special = 0 "; WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤 //TODO 记得移除特殊表过滤
//if (!string.IsNullOrWhiteSpace(gatherCode)) if (!string.IsNullOrWhiteSpace(gatherCode))
//{ {
// sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
//} }
return await SqlProvider.Instance.Change(DbEnum.EnergyDB) return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado .Ado
.QueryAsync<AmmeterInfo>(sql); .QueryAsync<AmmeterInfo>(sql);
@ -133,9 +133,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public override async Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890") public override async Task<List<WatermeterInfo>> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890")
{ {
string sql = $@"SELECT string sql = $@"SELECT
A.ID, A.ID as MeterId,
A.Name, A.Name,
A.FocusID, A.FocusID as FocusId,
A.MeteringCode, A.MeteringCode,
A.Baudrate, A.Baudrate,
A.MeteringPort, A.MeteringPort,

View File

@ -6,7 +6,11 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Models namespace JiShe.CollectBus.Common.Models
{ {
public class GlobalPagedResult<T> /// <summary>
/// 缓存全局分页结果
/// </summary>
/// <typeparam name="T"></typeparam>
public class BusCacheGlobalPagedResult<T>
{ {
/// <summary> /// <summary>
/// 数据集合 /// 数据集合

View File

@ -9,7 +9,7 @@ namespace JiShe.CollectBus.Common.Models
/// <summary> /// <summary>
/// 设备缓存基础模型 /// 设备缓存基础模型
/// </summary> /// </summary>
public class DeviceCacheBasicModel public abstract class DeviceCacheBasicModel
{ {
/// <summary> /// <summary>
/// 集中器Id /// 集中器Id
@ -20,5 +20,10 @@ namespace JiShe.CollectBus.Common.Models
/// 表Id /// 表Id
/// </summary> /// </summary>
public int MeterId { get; set; } public int MeterId { get; set; }
/// <summary>
/// 唯一标识
/// </summary>
public virtual string UniqueId => $"{FocusId}:{MeterId}";
} }
} }

View File

@ -1,4 +1,5 @@
using System; using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
@ -6,23 +7,13 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Ammeters namespace JiShe.CollectBus.Ammeters
{ {
public class AmmeterInfo public class AmmeterInfo: DeviceCacheBasicModel
{ {
/// <summary>
/// 电表ID
/// </summary>
public int ID { get; set; }
/// <summary> /// <summary>
/// 电表名称 /// 电表名称
/// </summary> /// </summary>
public string Name { get; set; } public string Name { get; set; }
/// <summary>
/// 集中器ID
/// </summary>
public int FocusID { get; set; }
/// <summary> /// <summary>
/// 集中器地址 /// 集中器地址
/// </summary> /// </summary>

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Models;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -10,13 +11,8 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// <summary> /// <summary>
/// 水表信息 /// 水表信息
/// </summary> /// </summary>
public class WatermeterInfo public class WatermeterInfo: DeviceCacheBasicModel
{ {
/// <summary>
/// 水表ID
/// </summary>
public int ID { get; set; }
/// <summary> /// <summary>
/// 水表名称 /// 水表名称
/// </summary> /// </summary>
@ -25,12 +21,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 表密码 /// 表密码
/// </summary> /// </summary>
public string Password { get; set; } public string Password { get; set; }
/// <summary>
/// 集中器ID
/// </summary>
public int FocusID { get; set; }
/// <summary> /// <summary>
/// 集中器地址 /// 集中器地址
/// </summary> /// </summary>

View File

@ -1,11 +1,13 @@
using FreeRedis; using FreeRedis;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeRedisProvider.Options; using JiShe.CollectBus.FreeRedisProvider.Options;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using System.Diagnostics; using System.Diagnostics;
using System.Text.Json; using System.Text.Json;
using Volo.Abp.DependencyInjection; using Volo.Abp.DependencyInjection;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace JiShe.CollectBus.FreeRedisProvider namespace JiShe.CollectBus.FreeRedisProvider
{ {
@ -41,139 +43,38 @@ namespace JiShe.CollectBus.FreeRedisProvider
return Instance; return Instance;
} }
/// <summary>
//public async Task AddMeterZSetCacheData<T>(string redisCacheKey, string redisCacheIndexKey, decimal score, T data) /// 单个添加数据
//{ /// </summary>
// if (score < 0 || data == null || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey)) /// <typeparam name="T"></typeparam>
// { /// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
// throw new Exception($"{nameof(AddMeterZSetCacheData)} 参数异常,-101"); /// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
// } /// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
/// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
// // 生成唯一member标识 /// <param name="data">表计信息</param>
// var member = data.Serialize(); /// <param name="timestamp">可选时间戳</param>
/// <returns></returns>
// // 计算score范围 public async Task AddMeterCacheData<T>(
// decimal dataScore = (long)score << 32;
// //// 事务操作
// //using (var tran = FreeRedisProvider.Instance.Multi())
// //{
// // await tran.ZAddAsync(cacheKey, score,member);
// // await tran.SAddAsync($"cat_index:{categoryId}", member);
// // object[] ret = tran.Exec();
// //}
// using (var pipe = Instance.StartPipe())
// {
// pipe.ZAdd(redisCacheKey, dataScore, member);
// pipe.SAdd(redisCacheIndexKey, member);
// object[] ret = pipe.EndPipe();
// }
// await Task.CompletedTask;
//}
//public async Task<BusPagedResult<T>> GetMeterZSetPagedData<T>(
//string redisCacheKey,
//string redisCacheIndexKey,
//decimal score,
//int pageSize = 10,
//int pageIndex = 1)
//{
// if (score < 0 || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey))
// {
// throw new Exception($"{nameof(GetMeterZSetPagedData)} 参数异常,-101");
// }
// // 计算score范围
// decimal minScore = (long)score << 32;
// decimal maxScore = ((long)score + 1) << 32;
// // 分页参数
// int start = (pageIndex - 1) * pageSize;
// // 查询主数据
// var members = await Instance.ZRevRangeByScoreAsync(
// redisCacheKey,
// maxScore,
// minScore,
// offset: start,
// count: pageSize
// );
// if (members == null)
// {
// throw new Exception($"{nameof(GetMeterZSetPagedData)} 获取缓存的信息失败,第 {pageIndex + 1} 页数据未返回,-102");
// }
// // 查询总数
// var total = await Instance.ZCountAsync(redisCacheKey, minScore, maxScore);
// return new BusPagedResult<T>
// {
// Items = members.Select(m =>
// BusJsonSerializer.Deserialize<T>(m)!).ToList(),
// TotalCount = total,
// PageIndex = pageIndex,
// PageSize = pageSize
// };
//}
///// <summary>
///// 删除数据示例
///// </summary>
///// <typeparam name="T"></typeparam>
///// <param name="redisCacheKey">分类</param>
///// <param name="redisCacheIndexKey"></param>
///// <param name="data"></param>
///// <returns></returns>
//public async Task RemoveMeterZSetData<T>(
//string redisCacheKey,
//string redisCacheIndexKey,
//T data)
//{
// // 查询需要删除的member
// var members = await Instance.SMembersAsync(redisCacheIndexKey);
// var target = members.FirstOrDefault(m =>
// BusJsonSerializer.Deserialize<T>(m) == data);//泛型此处该如何处理?
// if (target != null)
// {
// using (var trans = Instance.Multi())
// {
// trans.ZRem(redisCacheKey, target);
// trans.SRem(redisCacheIndexKey, target);
// trans.Exec();
// }
// }
// await Task.CompletedTask;
//}
public async Task AddMeterZSetCacheData<T>(
string redisCacheKey, string redisCacheKey,
string redisCacheIndexKey, string redisCacheFocusIndexKey,
int categoryId, // 新增分类ID参数 string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
T data, T data,
DateTimeOffset? timestamp = null) DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
{ {
// 参数校验增强 // 参数校验增强
if (data == null || string.IsNullOrWhiteSpace(redisCacheKey) if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
|| string.IsNullOrWhiteSpace(redisCacheIndexKey)) || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
|| string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
{ {
throw new ArgumentException("Invalid parameters"); throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
} }
// 生成唯一member标识带数据指纹
var member = $"{categoryId}:{Guid.NewGuid()}";
var serializedData = data.Serialize();
// 计算组合score分类ID + 时间戳) // 计算组合score分类ID + 时间戳)
var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
long scoreValue = ((long)categoryId << 32) | (uint)actualTimestamp.Ticks; long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
//全局索引写入 //全局索引写入
long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
@ -182,64 +83,83 @@ namespace JiShe.CollectBus.FreeRedisProvider
using (var trans = Instance.Multi()) using (var trans = Instance.Multi())
{ {
// 主数据存储Hash // 主数据存储Hash
trans.HSet(redisCacheKey, member, serializedData); trans.HSet(redisCacheKey, data.UniqueId, data.Serialize());
// 分类索引
trans.SAdd(redisCacheFocusIndexKey, data.UniqueId);
// 排序索引使用ZSET // 排序索引使用ZSET
trans.ZAdd($"{redisCacheKey}_scores", scoreValue, member); trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.UniqueId);
// 分类索引
trans.SAdd(redisCacheIndexKey, member);
//全局索引 //全局索引
trans.ZAdd("global_data_all", globalScore, member); trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.UniqueId);
var results = trans.Exec(); var results = trans.Exec();
if (results == null || results.Length <= 0) if (results == null || results.Length <= 0)
throw new Exception("Transaction failed"); throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
} }
await Task.CompletedTask; await Task.CompletedTask;
} }
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
/// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
/// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
/// <param name="items">数据集合</param>
/// <param name="timestamp">可选时间戳</param>
/// <returns></returns>
public async Task BatchAddMeterData<T>( public async Task BatchAddMeterData<T>(
string redisCacheKey, string redisCacheKey,
string indexKey, string redisCacheFocusIndexKey,
IEnumerable<T> items) where T : DeviceCacheBasicModel string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
IEnumerable<T> items,
DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
{ {
const int BATCH_SIZE = 1000; // 每批1000条 const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
//foreach (var batch in items.Batch(BATCH_SIZE)) foreach (var batch in items.Batch(BATCH_SIZE))
//{ {
// await semaphore.WaitAsync(); await semaphore.WaitAsync();
// _ = Task.Run(async () => _ = Task.Run(() =>
// { {
// using (var pipe = FreeRedisProvider.Instance.StartPipe()) using (var pipe = Instance.StartPipe())
// { {
// foreach (var item in batch) foreach (var item in batch)
// { {
// var member = $"{item.CategoryId}:{Guid.NewGuid()}"; // 计算组合score分类ID + 时间戳)
// long score = ((long)item.CategoryId << 32) | (uint)item.Timestamp.Ticks; var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
// // Hash主数据 long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
// pipe.HSet(redisCacheKey, member, item.Data.Serialize());
// // 分类索引 //全局索引写入
// pipe.ZAdd($"{redisCacheKey}_scores", score, member); long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
// // 全局索引 // 主数据存储Hash
// pipe.ZAdd("global_data_all", item.Timestamp.ToUnixTimeMilliseconds(), member); pipe.HSet(redisCacheKey, item.UniqueId, item.Serialize());
// // 分类快速索引 // 分类索引
// pipe.SAdd(indexKey, member); pipe.SAdd(redisCacheFocusIndexKey, item.UniqueId);
// }
// pipe.EndPipe(); // 排序索引使用ZSET
// } pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.UniqueId);
// semaphore.Release();
// }); //全局索引
//} pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.UniqueId);
}
pipe.EndPipe();
}
semaphore.Release();
});
}
await Task.CompletedTask; await Task.CompletedTask;
} }
@ -411,7 +331,7 @@ namespace JiShe.CollectBus.FreeRedisProvider
throw new Exception("删除操作失败"); throw new Exception("删除操作失败");
} }
public async Task<GlobalPagedResult<T>> GetGlobalPagedData<T>( public async Task<BusCacheGlobalPagedResult<T>> GetGlobalPagedData<T>(
string redisCacheKey, string redisCacheKey,
int pageSize = 10, int pageSize = 10,
long? lastScore = null, long? lastScore = null,
@ -461,7 +381,7 @@ namespace JiShe.CollectBus.FreeRedisProvider
? await GetNextCursor(zsetKey, actualMembers.Last(), descending) ? await GetNextCursor(zsetKey, actualMembers.Last(), descending)
: (null, null); : (null, null);
return new GlobalPagedResult<T> return new BusCacheGlobalPagedResult<T>
{ {
Items = dataTasks.Select(t => t.Result).ToList(), Items = dataTasks.Select(t => t.Result).ToList(),
HasNext = hasNext, HasNext = hasNext,

View File

@ -1,4 +1,5 @@
using FreeRedis; using FreeRedis;
using JiShe.CollectBus.Common.Models;
namespace JiShe.CollectBus.FreeRedisProvider namespace JiShe.CollectBus.FreeRedisProvider
{ {
@ -9,6 +10,44 @@ namespace JiShe.CollectBus.FreeRedisProvider
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
RedisClient Instance { get; set; } RedisClient Instance { get; set; }
/// <summary>
/// 单个添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
/// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
/// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
/// <param name="data">表计信息</param>
/// <param name="timestamp">可选时间戳</param>
/// <returns></returns>
Task AddMeterCacheData<T>(
string redisCacheKey,
string redisCacheFocusIndexKey,
string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
T data,
DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel;
/// <summary>
/// 批量添加数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="redisCacheKey">主数据存储Hash缓存Key</param>
/// <param name="redisCacheFocusIndexKey">集中器索引Set缓存Key</param>
/// <param name="redisCacheScoresIndexKey">集中器排序索引ZSET缓存Key</param>
/// <param name="redisCacheGlobalIndexKey">集中器采集频率分组全局索引ZSet缓存Key</param>
/// <param name="items">数据集合</param>
/// <param name="timestamp">可选时间戳</param>
/// <returns></returns>
Task BatchAddMeterData<T>(
string redisCacheKey,
string redisCacheFocusIndexKey,
string redisCacheScoresIndexKey,
string redisCacheGlobalIndexKey,
IEnumerable<T> items,
DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel;
} }
} }

View File

@ -128,7 +128,7 @@
"OpenDebugMode": true, "OpenDebugMode": true,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"ServerTagName": "JiSheCollectBus", "ServerTagName": "JiSheCollectBus2",
"KafkaReplicationFactor": 3, "KafkaReplicationFactor": 3,
"NumPartitions": 30 "NumPartitions": 30
} }