diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 287158f..5a665ff 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -207,26 +207,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
-#if DEBUG
- var timeDensity = "15";
- //获取缓存中的电表信息
- var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*";
+//#if DEBUG
+// var timeDensity = "15";
+// //获取缓存中的电表信息
+// var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*";
- var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
- List focusAddressDataLista = new List();
- foreach (var item in meterInfos)
- {
- focusAddressDataLista.Add(item.FocusAddress);
- }
+// var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
+// var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
+// List focusAddressDataLista = new List();
+// foreach (var item in meterInfos)
+// {
+// focusAddressDataLista.Add(item.FocusAddress);
+// }
+
+// DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
+// return;
+//#else
+// var meterInfos = await GetAmmeterInfoList(gatherCode);
+//#endif
- DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
- return;
-#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
-#endif
-
-
if (meterInfos == null || meterInfos.Count <= 0)
{
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空");
@@ -311,7 +311,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
}
- keyValuePairs.TryAdd($"{ammeter.ID}", ammeter);
+ keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
@@ -757,10 +757,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime,
MeterAddress = ammeterInfo.AmmerterAddress,
- MeterId = ammeterInfo.ID,
+ MeterId = ammeterInfo.MeterId,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeterInfo.FocusAddress,
- FocusID = ammeterInfo.FocusID,
+ FocusID = ammeterInfo.FocusId,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
@@ -772,7 +772,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
};
meterReadingRecords.CreateDataId(GuidGenerator.Create());
- keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords);
+ keyValuePairs.TryAdd($"{ammeterInfo.MeterId}_{tempItem}", meterReadingRecords);
}
//TimeSpan timeSpan = TimeSpan.FromMicroseconds(5);
//await Task.Delay(timeSpan);
@@ -910,22 +910,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
if (string.IsNullOrWhiteSpace(ammeter.AreaCode))
{
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信区号为空");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空");
continue;
}
if (string.IsNullOrWhiteSpace(ammeter.Address))
{
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址为空");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空");
continue;
}
if (Convert.ToInt32(ammeter.Address) > 65535)
{
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址无效,确保大于65535");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535");
continue;
}
if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033)
{
- _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},非有效测量点号({ammeter.MeteringCode})");
+ _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})");
continue;
}
@@ -1023,10 +1023,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
PendingCopyReadTime = pendingCopyReadTime,
CreationTime = currentTime,
MeterAddress = ammeter.AmmerterAddress,
- MeterId = ammeter.ID,
+ MeterId = ammeter.MeterId,
MeterType = MeterTypeEnum.Ammeter,
FocusAddress = ammeter.FocusAddress,
- FocusID = ammeter.FocusID,
+ FocusID = ammeter.FocusId,
AFN = aFN,
Fn = fn,
ItemCode = tempItem,
@@ -1038,7 +1038,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
};
meterReadingRecords.CreateDataId(GuidGenerator.Create());
- keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords);
+ keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
@@ -1097,7 +1097,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var subItem in item)
{
- keyValuePairs.TryAdd($"{subItem.ID}", subItem);
+ keyValuePairs.TryAdd($"{subItem.MeterId}", subItem);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index 91f7d4a..07f5526 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -80,11 +80,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// Baudrate = 2400,
// FocusAddress = "402440506",
// Name = "张家祠工务(三相电表)",
- // FocusID = 95780,
+ // FocusId = 95780,
// DatabaseBusiID = 1,
// MeteringCode = 1,
// AmmerterAddress = "402410040506",
- // ID = 127035,
+ // MeterId = 127035,
// TypeName = 3,
// DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679",
// TimeDensity = 15,
@@ -94,11 +94,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
// Baudrate = 2400,
// FocusAddress = "542400504",
// Name = "五号配(长芦二所四排)(单相电表)",
- // FocusID = 69280,
+ // FocusId = 69280,
// DatabaseBusiID = 1,
// MeteringCode = 2,
// AmmerterAddress = "542410000504",
- // ID = 95594,
+ // MeterId = 95594,
// TypeName = 1,
// DataTypes = "581,589,592,597,601",
// TimeDensity = 15,
@@ -106,7 +106,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//return ammeterInfos;
- string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
+ string sql = $@"SELECT C.ID as MeterId,C.Name,C.FocusID as FocusId,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID
FROM TB_GatherInfo(NOLOCK) AS A
INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0
INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100
@@ -114,10 +114,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
WHERE 1=1 and C.Special = 0 ";
//TODO 记得移除特殊表过滤
- //if (!string.IsNullOrWhiteSpace(gatherCode))
- //{
- // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
- //}
+ if (!string.IsNullOrWhiteSpace(gatherCode))
+ {
+ sql = $@"{sql} AND A.GatherCode = '{gatherCode}'";
+ }
return await SqlProvider.Instance.Change(DbEnum.EnergyDB)
.Ado
.QueryAsync(sql);
@@ -133,9 +133,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public override async Task> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890")
{
string sql = $@"SELECT
- A.ID,
+ A.ID as MeterId,
A.Name,
- A.FocusID,
+ A.FocusID as FocusId,
A.MeteringCode,
A.Baudrate,
A.MeteringPort,
diff --git a/src/JiShe.CollectBus.Common/Models/BusGlobalPagedResult.cs b/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs
similarity index 80%
rename from src/JiShe.CollectBus.Common/Models/BusGlobalPagedResult.cs
rename to src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs
index 6ae8b91..ed42529 100644
--- a/src/JiShe.CollectBus.Common/Models/BusGlobalPagedResult.cs
+++ b/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs
@@ -6,7 +6,11 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Models
{
- public class GlobalPagedResult
+ ///
+ /// 缓存全局分页结果
+ ///
+ ///
+ public class BusCacheGlobalPagedResult
{
///
/// 数据集合
diff --git a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
index 05b654d..06a427b 100644
--- a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
+++ b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
@@ -9,7 +9,7 @@ namespace JiShe.CollectBus.Common.Models
///
/// 设备缓存基础模型
///
- public class DeviceCacheBasicModel
+ public abstract class DeviceCacheBasicModel
{
///
/// 集中器Id
@@ -20,5 +20,10 @@ namespace JiShe.CollectBus.Common.Models
/// 表Id
///
public int MeterId { get; set; }
+
+ ///
+ /// 唯一标识
+ ///
+ public virtual string UniqueId => $"{FocusId}:{MeterId}";
}
}
diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
index fa21071..4ac667d 100644
--- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
+++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs
@@ -1,4 +1,5 @@
-using System;
+using JiShe.CollectBus.Common.Models;
+using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@@ -6,23 +7,13 @@ using System.Threading.Tasks;
namespace JiShe.CollectBus.Ammeters
{
- public class AmmeterInfo
- {
- ///
- /// 电表ID
- ///
- public int ID { get; set; }
-
+ public class AmmeterInfo: DeviceCacheBasicModel
+ {
///
/// 电表名称
///
public string Name { get; set; }
- ///
- /// 集中器ID
- ///
- public int FocusID { get; set; }
-
///
/// 集中器地址
///
diff --git a/src/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs b/src/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs
index c735a9a..966192b 100644
--- a/src/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs
+++ b/src/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs
@@ -1,4 +1,5 @@
using JiShe.CollectBus.Common.Enums;
+using JiShe.CollectBus.Common.Models;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -10,13 +11,8 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
///
/// 水表信息
///
- public class WatermeterInfo
- {
- ///
- /// 水表ID
- ///
- public int ID { get; set; }
-
+ public class WatermeterInfo: DeviceCacheBasicModel
+ {
///
/// 水表名称
///
@@ -25,12 +21,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter
/// 表密码
///
public string Password { get; set; }
-
- ///
- /// 集中器ID
- ///
- public int FocusID { get; set; }
-
+
///
/// 集中器地址
///
diff --git a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs
index c76f9e6..1e07e20 100644
--- a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs
+++ b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs
@@ -1,11 +1,13 @@
using FreeRedis;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models;
+using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.FreeRedisProvider.Options;
using Microsoft.Extensions.Options;
using System.Diagnostics;
using System.Text.Json;
using Volo.Abp.DependencyInjection;
+using static System.Runtime.InteropServices.JavaScript.JSType;
namespace JiShe.CollectBus.FreeRedisProvider
{
@@ -41,139 +43,38 @@ namespace JiShe.CollectBus.FreeRedisProvider
return Instance;
}
-
- //public async Task AddMeterZSetCacheData(string redisCacheKey, string redisCacheIndexKey, decimal score, T data)
- //{
- // if (score < 0 || data == null || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey))
- // {
- // throw new Exception($"{nameof(AddMeterZSetCacheData)} 参数异常,-101");
- // }
-
- // // 生成唯一member标识
- // var member = data.Serialize();
-
- // // 计算score范围
- // decimal dataScore = (long)score << 32;
-
- // //// 事务操作
- // //using (var tran = FreeRedisProvider.Instance.Multi())
- // //{
- // // await tran.ZAddAsync(cacheKey, score,member);
- // // await tran.SAddAsync($"cat_index:{categoryId}", member);
- // // object[] ret = tran.Exec();
- // //}
-
- // using (var pipe = Instance.StartPipe())
- // {
- // pipe.ZAdd(redisCacheKey, dataScore, member);
- // pipe.SAdd(redisCacheIndexKey, member);
- // object[] ret = pipe.EndPipe();
- // }
-
- // await Task.CompletedTask;
- //}
-
- //public async Task> GetMeterZSetPagedData(
- //string redisCacheKey,
- //string redisCacheIndexKey,
- //decimal score,
- //int pageSize = 10,
- //int pageIndex = 1)
- //{
- // if (score < 0 || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey))
- // {
- // throw new Exception($"{nameof(GetMeterZSetPagedData)} 参数异常,-101");
- // }
-
- // // 计算score范围
- // decimal minScore = (long)score << 32;
- // decimal maxScore = ((long)score + 1) << 32;
-
- // // 分页参数
- // int start = (pageIndex - 1) * pageSize;
-
- // // 查询主数据
- // var members = await Instance.ZRevRangeByScoreAsync(
- // redisCacheKey,
- // maxScore,
- // minScore,
- // offset: start,
- // count: pageSize
- // );
-
- // if (members == null)
- // {
- // throw new Exception($"{nameof(GetMeterZSetPagedData)} 获取缓存的信息失败,第 {pageIndex + 1} 页数据未返回,-102");
- // }
-
- // // 查询总数
- // var total = await Instance.ZCountAsync(redisCacheKey, minScore, maxScore);
-
- // return new BusPagedResult
- // {
- // Items = members.Select(m =>
- // BusJsonSerializer.Deserialize(m)!).ToList(),
- // TotalCount = total,
- // PageIndex = pageIndex,
- // PageSize = pageSize
- // };
- //}
-
- /////
- ///// 删除数据示例
- /////
- /////
- ///// 分类
- /////
- /////
- /////
- //public async Task RemoveMeterZSetData(
- //string redisCacheKey,
- //string redisCacheIndexKey,
- //T data)
- //{
-
- // // 查询需要删除的member
- // var members = await Instance.SMembersAsync(redisCacheIndexKey);
- // var target = members.FirstOrDefault(m =>
- // BusJsonSerializer.Deserialize(m) == data);//泛型此处该如何处理?
-
- // if (target != null)
- // {
- // using (var trans = Instance.Multi())
- // {
- // trans.ZRem(redisCacheKey, target);
- // trans.SRem(redisCacheIndexKey, target);
- // trans.Exec();
- // }
- // }
-
- // await Task.CompletedTask;
- //}
-
-
- public async Task AddMeterZSetCacheData(
+ ///
+ /// 单个添加数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// 集中器索引Set缓存Key
+ /// 集中器排序索引ZSET缓存Key
+ /// 集中器采集频率分组全局索引ZSet缓存Key
+ /// 表计信息
+ /// 可选时间戳
+ ///
+ public async Task AddMeterCacheData(
string redisCacheKey,
- string redisCacheIndexKey,
- int categoryId, // 新增分类ID参数
+ string redisCacheFocusIndexKey,
+ string redisCacheScoresIndexKey,
+ string redisCacheGlobalIndexKey,
T data,
- DateTimeOffset? timestamp = null)
+ DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
{
// 参数校验增强
if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
- || string.IsNullOrWhiteSpace(redisCacheIndexKey))
+ || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
+ || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
+ || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
{
- throw new ArgumentException("Invalid parameters");
+ throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
}
-
- // 生成唯一member标识(带数据指纹)
- var member = $"{categoryId}:{Guid.NewGuid()}";
- var serializedData = data.Serialize();
-
+
// 计算组合score(分类ID + 时间戳)
var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
- long scoreValue = ((long)categoryId << 32) | (uint)actualTimestamp.Ticks;
+ long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
//全局索引写入
long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
@@ -182,64 +83,83 @@ namespace JiShe.CollectBus.FreeRedisProvider
using (var trans = Instance.Multi())
{
// 主数据存储Hash
- trans.HSet(redisCacheKey, member, serializedData);
+ trans.HSet(redisCacheKey, data.UniqueId, data.Serialize());
+
+ // 分类索引
+ trans.SAdd(redisCacheFocusIndexKey, data.UniqueId);
// 排序索引使用ZSET
- trans.ZAdd($"{redisCacheKey}_scores", scoreValue, member);
-
- // 分类索引
- trans.SAdd(redisCacheIndexKey, member);
+ trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.UniqueId);
//全局索引
- trans.ZAdd("global_data_all", globalScore, member);
+ trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.UniqueId);
var results = trans.Exec();
if (results == null || results.Length <= 0)
- throw new Exception("Transaction failed");
+ throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
}
await Task.CompletedTask;
}
+ ///
+ /// 批量添加数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// 集中器索引Set缓存Key
+ /// 集中器排序索引ZSET缓存Key
+ /// 集中器采集频率分组全局索引ZSet缓存Key
+ /// 数据集合
+ /// 可选时间戳
+ ///
public async Task BatchAddMeterData(
string redisCacheKey,
- string indexKey,
- IEnumerable items) where T : DeviceCacheBasicModel
+ string redisCacheFocusIndexKey,
+ string redisCacheScoresIndexKey,
+ string redisCacheGlobalIndexKey,
+ IEnumerable items,
+ DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
{
const int BATCH_SIZE = 1000; // 每批1000条
var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
- //foreach (var batch in items.Batch(BATCH_SIZE))
- //{
- // await semaphore.WaitAsync();
+ foreach (var batch in items.Batch(BATCH_SIZE))
+ {
+ await semaphore.WaitAsync();
- // _ = Task.Run(async () =>
- // {
- // using (var pipe = FreeRedisProvider.Instance.StartPipe())
- // {
- // foreach (var item in batch)
- // {
- // var member = $"{item.CategoryId}:{Guid.NewGuid()}";
- // long score = ((long)item.CategoryId << 32) | (uint)item.Timestamp.Ticks;
+ _ = Task.Run(() =>
+ {
+ using (var pipe = Instance.StartPipe())
+ {
+ foreach (var item in batch)
+ {
+ // 计算组合score(分类ID + 时间戳)
+ var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
- // // Hash主数据
- // pipe.HSet(redisCacheKey, member, item.Data.Serialize());
+ long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
- // // 分类索引
- // pipe.ZAdd($"{redisCacheKey}_scores", score, member);
+ //全局索引写入
+ long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
- // // 全局索引
- // pipe.ZAdd("global_data_all", item.Timestamp.ToUnixTimeMilliseconds(), member);
+ // 主数据存储Hash
+ pipe.HSet(redisCacheKey, item.UniqueId, item.Serialize());
- // // 分类快速索引
- // pipe.SAdd(indexKey, member);
- // }
- // pipe.EndPipe();
- // }
- // semaphore.Release();
- // });
- //}
+ // 分类索引
+ pipe.SAdd(redisCacheFocusIndexKey, item.UniqueId);
+
+ // 排序索引使用ZSET
+ pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.UniqueId);
+
+ //全局索引
+ pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.UniqueId);
+ }
+ pipe.EndPipe();
+ }
+ semaphore.Release();
+ });
+ }
await Task.CompletedTask;
}
@@ -411,7 +331,7 @@ namespace JiShe.CollectBus.FreeRedisProvider
throw new Exception("删除操作失败");
}
- public async Task> GetGlobalPagedData(
+ public async Task> GetGlobalPagedData(
string redisCacheKey,
int pageSize = 10,
long? lastScore = null,
@@ -461,7 +381,7 @@ namespace JiShe.CollectBus.FreeRedisProvider
? await GetNextCursor(zsetKey, actualMembers.Last(), descending)
: (null, null);
- return new GlobalPagedResult
+ return new BusCacheGlobalPagedResult
{
Items = dataTasks.Select(t => t.Result).ToList(),
HasNext = hasNext,
diff --git a/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs
index cc3ff02..36f9a81 100644
--- a/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs
+++ b/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs
@@ -1,4 +1,5 @@
using FreeRedis;
+using JiShe.CollectBus.Common.Models;
namespace JiShe.CollectBus.FreeRedisProvider
{
@@ -9,6 +10,44 @@ namespace JiShe.CollectBus.FreeRedisProvider
///
///
RedisClient Instance { get; set; }
+
+ ///
+ /// 单个添加数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// 集中器索引Set缓存Key
+ /// 集中器排序索引ZSET缓存Key
+ /// 集中器采集频率分组全局索引ZSet缓存Key
+ /// 表计信息
+ /// 可选时间戳
+ ///
+ Task AddMeterCacheData(
+ string redisCacheKey,
+ string redisCacheFocusIndexKey,
+ string redisCacheScoresIndexKey,
+ string redisCacheGlobalIndexKey,
+ T data,
+ DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel;
+
+ ///
+ /// 批量添加数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// 集中器索引Set缓存Key
+ /// 集中器排序索引ZSET缓存Key
+ /// 集中器采集频率分组全局索引ZSet缓存Key
+ /// 数据集合
+ /// 可选时间戳
+ ///
+ Task BatchAddMeterData(
+ string redisCacheKey,
+ string redisCacheFocusIndexKey,
+ string redisCacheScoresIndexKey,
+ string redisCacheGlobalIndexKey,
+ IEnumerable items,
+ DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel;
}
}
diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json
index a5b2e15..c922f8d 100644
--- a/src/JiShe.CollectBus.Host/appsettings.json
+++ b/src/JiShe.CollectBus.Host/appsettings.json
@@ -128,7 +128,7 @@
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
},
- "ServerTagName": "JiSheCollectBus",
+ "ServerTagName": "JiSheCollectBus2",
"KafkaReplicationFactor": 3,
"NumPartitions": 30
}
\ No newline at end of file