From 1a275ec9c3ca87effa7ad64d0c640e5428f396eb Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Wed, 16 Apr 2025 17:36:46 +0800
Subject: [PATCH] =?UTF-8?q?=E6=89=B9=E9=87=8F=E5=86=99=E5=85=A5=E6=B5=8B?=
=?UTF-8?q?=E8=AF=95=E8=B0=83=E4=BC=98?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../RedisDataCache/IRedisDataCacheService.cs | 176 ++++
.../CollectBusAppService.cs | 4 +-
.../RedisDataCache/RedisDataCacheService.cs | 745 ++++++++++++++
.../Samples/SampleAppService.cs | 57 +-
.../BasicScheduledMeterReadingService.cs | 90 +-
...nergySystemScheduledMeterReadingService.cs | 16 +-
.../Consts/RedisConst.cs | 29 +-
.../Models/BusCacheGlobalPagedResult.cs | 16 +
.../Models/DeviceCacheBasicModel.cs | 7 +-
.../FreeRedisProvider.cs | 957 ++++++++----------
.../IFreeRedisProvider.cs | 100 +-
.../Options/FreeRedisOptions.cs | 5 +
src/JiShe.CollectBus.Host/appsettings.json | 3 +-
13 files changed, 1491 insertions(+), 714 deletions(-)
create mode 100644 src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
create mode 100644 src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
diff --git a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
new file mode 100644
index 0000000..c6f3613
--- /dev/null
+++ b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs
@@ -0,0 +1,176 @@
+using JiShe.CollectBus.Common.Models;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Application.Contracts
+{
+ ///
+ /// 数据缓存服务接口
+ ///
+ public interface IRedisDataCacheService
+ {
+ ///
+ /// 单个添加数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// ZSET索引缓存Key
+ /// 待缓存数据
+ ///
+ Task InsertDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ T data) where T : DeviceCacheBasicModel;
+
+ ///
+ /// 批量添加数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// ZSET索引缓存Key
+ /// 待缓存数据集合
+ ///
+ Task BatchInsertDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ IEnumerable items) where T : DeviceCacheBasicModel;
+
+ ///
+ /// 删除缓存信息
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// ZSET索引缓存Key
+ /// 已缓存数据
+ ///
+ Task RemoveCacheDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ T data) where T : DeviceCacheBasicModel;
+
+ ///
+ /// 修改缓存信息,映射关系未发生改变
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// ZSET索引缓存Key
+ /// 待修改缓存数据
+ ///
+ Task ModifyDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ T newData) where T : DeviceCacheBasicModel;
+
+
+ ///
+ /// 修改缓存信息,映射关系已改变
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// 旧的映射关系
+ /// ZSET索引缓存Key
+ /// 待修改缓存数据
+ ///
+ Task ModifyDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string oldMemberId,
+ string redisZSetScoresIndexCacheKey,
+ T newData) where T : DeviceCacheBasicModel;
+
+ /////
+ ///// 通过集中器与表计信息排序索引获取数据
+ /////
+ /////
+ ///// 主数据存储Hash缓存Key
+ ///// ZSET索引缓存Key
+ ///// 分页尺寸
+ ///// 最后一个索引
+ ///// 最后一个唯一标识
+ ///// 排序方式
+ /////
+ //Task> GetPagedData(
+ //string redisHashCacheKey,
+ //string redisZSetScoresIndexCacheKey,
+ //IEnumerable focusIds,
+ //int pageSize = 10,
+ //decimal? lastScore = null,
+ //string lastMember = null,
+ //bool descending = true)
+ //where T : DeviceCacheBasicModel;
+
+
+ ///
+ /// 通过ZSET索引获取数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// ZSET索引缓存Key
+ /// 分页尺寸
+ /// 最后一个索引
+ /// 最后一个唯一标识
+ /// 排序方式
+ ///
+ Task> GetAllPagedData(
+ string redisHashCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ int pageSize = 1000,
+ decimal? lastScore = null,
+ string lastMember = null,
+ bool descending = true)
+ where T : DeviceCacheBasicModel;
+
+
+ /////
+ ///// 游标分页查询
+ /////
+ ///// 排序索引ZSET缓存Key
+ ///// 分页数量
+ ///// 开始索引
+ ///// 开始唯一标识
+ ///// 排序方式
+ /////
+ //Task<(List Members, bool HasNext)> GetPagedMembers(
+ // string redisZSetScoresIndexCacheKey,
+ // int pageSize,
+ // decimal? startScore,
+ // string excludeMember,
+ // bool descending);
+
+ /////
+ ///// 批量获取指定分页的数据
+ /////
+ /////
+ ///// Hash表缓存key
+ ///// Hash表字段集合
+ /////
+ //Task> BatchGetData(
+ // string redisHashCacheKey,
+ // IEnumerable members)
+ // where T : DeviceCacheBasicModel;
+
+ /////
+ ///// 获取下一页游标
+ /////
+ ///// 排序索引ZSET缓存Key
+ ///// 最后一个唯一标识
+ ///// 排序方式
+ /////
+ //Task GetNextScore(
+ // string redisZSetScoresIndexCacheKey,
+ // string lastMember,
+ // bool descending);
+ }
+}
diff --git a/src/JiShe.CollectBus.Application/CollectBusAppService.cs b/src/JiShe.CollectBus.Application/CollectBusAppService.cs
index 6ecc509..ad5b585 100644
--- a/src/JiShe.CollectBus.Application/CollectBusAppService.cs
+++ b/src/JiShe.CollectBus.Application/CollectBusAppService.cs
@@ -79,7 +79,7 @@ public abstract class CollectBusAppService : ApplicationService
{
string key = (string)item[0];
object[] fieldsAndValues = (object[])item[1];
- var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}";
+ var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, systemType, serverTagName, meterType, timeDensity)}";
string focusAddress = key.Replace(redisCacheKey, "");
var meterHashs = new Dictionary();
@@ -182,7 +182,7 @@ public abstract class CollectBusAppService : ApplicationService
string key = (string)item[0];
object[] fieldsAndValues = (object[])item[1];
var redisCacheKey = string.Format(
- RedisConst.CacheMeterInfoKey,
+ RedisConst.CacheMeterInfoHashKey,
systemType,
serverTagName,
meterType,
diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
new file mode 100644
index 0000000..4d78706
--- /dev/null
+++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs
@@ -0,0 +1,745 @@
+using Confluent.Kafka;
+using FreeRedis;
+using JiShe.CollectBus.Application.Contracts;
+using JiShe.CollectBus.Common.Extensions;
+using JiShe.CollectBus.Common.Helpers;
+using JiShe.CollectBus.Common.Models;
+using JiShe.CollectBus.FreeRedisProvider;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Volo.Abp.DependencyInjection;
+
+namespace JiShe.CollectBus.RedisDataCache
+{
+ ///
+ /// 数据缓存服务接口
+ ///
+ public class RedisDataCacheService : IRedisDataCacheService, ITransientDependency
+ {
+ private readonly IFreeRedisProvider _freeRedisProvider;
+ private readonly ILogger _logger;
+ private RedisClient Instance { get; set; }
+
+ ///
+ /// 数据缓存服务接口
+ ///
+ ///
+ ///
+ public RedisDataCacheService(IFreeRedisProvider freeRedisProvider,
+ ILogger logger)
+ {
+ this._freeRedisProvider = freeRedisProvider;
+ this._logger = logger;
+
+ Instance = _freeRedisProvider.Instance;
+ }
+
+ ///
+ /// 单个添加数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// ZSET索引缓存Key
+ /// 待缓存数据
+ ///
+ public async Task InsertDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ T data) where T : DeviceCacheBasicModel
+ {
+ // 参数校验增强
+ if (data == null || string.IsNullOrWhiteSpace(redisHashCacheKey)
+ || string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
+ || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ {
+ _logger.LogError($"{nameof(InsertDataAsync)} 参数异常,-101");
+ return;
+ }
+
+ // 使用事务保证原子性
+ using (var trans = Instance.Multi())
+ {
+ // 主数据存储Hash
+ trans.HSet(redisHashCacheKey, data.MemberID, data.Serialize());
+
+ // 集中器号分组索引Set缓存
+ trans.SAdd(redisSetIndexCacheKey, data.MemberID);
+
+ // 集中器与表计信息排序索引ZSET缓存Key
+ trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberID);
+
+ var results = trans.Exec();
+
+ if (results == null || results.Length <= 0)
+ {
+ _logger.LogError($"{nameof(InsertDataAsync)} 添加事务提交失败,-102");
+ }
+ }
+
+ await Task.CompletedTask;
+ }
+
+ ///
+ /// 批量添加数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// ZSET索引缓存Key
+ /// 待缓存数据集合
+ ///
+ public async Task BatchInsertDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ IEnumerable items) where T : DeviceCacheBasicModel
+ {
+ if (items == null
+ || items.Count() <= 0
+ || string.IsNullOrWhiteSpace(redisHashCacheKey)
+ || string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
+ || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ {
+ _logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101");
+ return;
+ }
+
+ const int BATCH_SIZE = 1000; // 每批1000条
+ var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
+
+ foreach (var batch in items.Batch(BATCH_SIZE))
+ {
+ await semaphore.WaitAsync();
+
+ _ = Task.Run(() =>
+ {
+ using (var pipe = Instance.StartPipe())
+ {
+ foreach (var item in batch)
+ {
+ // 主数据存储Hash
+ pipe.HSet(redisHashCacheKey, item.MemberID, item.Serialize());
+
+ // Set索引缓存
+ pipe.SAdd(redisSetIndexCacheKey, item.MemberID);
+
+ // ZSET索引缓存Key
+ pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberID);
+ }
+ pipe.EndPipe();
+ }
+ semaphore.Release();
+ });
+ }
+
+ await Task.CompletedTask;
+ }
+
+ ///
+ /// 删除缓存信息
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// ZSET索引缓存Key
+ /// 已缓存数据
+ ///
+ public async Task RemoveCacheDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ T data) where T : DeviceCacheBasicModel
+ {
+ if (data == null
+ || string.IsNullOrWhiteSpace(redisHashCacheKey)
+ || string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
+ || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ {
+ _logger.LogError($"{nameof(RemoveCacheDataAsync)} 参数异常,-101");
+ return;
+ }
+
+ const string luaScript = @"
+ local hashCacheKey = KEYS[1]
+ local setIndexCacheKey = KEYS[2]
+ local zsetScoresIndexCacheKey = KEYS[3]
+ local member = ARGV[1]
+
+ local deleted = 0
+ if redis.call('HDEL', hashCacheKey, member) > 0 then
+ deleted = 1
+ end
+
+ redis.call('SREM', setIndexCacheKey, member)
+ redis.call('ZREM', zsetScoresIndexCacheKey, member)
+ return deleted
+ ";
+
+ var keys = new[]
+ {
+ redisHashCacheKey,
+ redisSetIndexCacheKey,
+ redisZSetScoresIndexCacheKey
+ };
+
+ var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
+
+ if ((int)result == 0)
+ {
+ _logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberID}数据失败,-102");
+ }
+ }
+
+ ///
+ /// 修改缓存信息,映射关系未发生改变
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// ZSET索引缓存Key
+ /// 待修改缓存数据
+ ///
+ public async Task ModifyDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ T newData) where T : DeviceCacheBasicModel
+ {
+ if (newData == null
+ || string.IsNullOrWhiteSpace(redisHashCacheKey)
+ || string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
+ || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ {
+ _logger.LogError($"{nameof(ModifyDataAsync)} 参数异常,-101");
+ return;
+ }
+
+ var luaScript = @"
+ local hashCacheKey = KEYS[1]
+ local member = ARGV[1]
+ local newData = ARGV[2]
+
+ -- 校验存在性
+ if redis.call('HEXISTS', hashCacheKey, member) == 0 then
+ return 0
+ end
+
+ -- 更新主数据
+ redis.call('HSET', hashCacheKey, member, newData)
+
+ return 1
+ ";
+
+
+ var result = await Instance.EvalAsync(luaScript,
+ new[]
+ {
+ redisHashCacheKey
+ },
+ new object[]
+ {
+ newData.MemberID,
+ newData.Serialize()
+ });
+
+ if ((int)result == 0)
+ {
+ _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102");
+ }
+ }
+
+ ///
+ /// 修改缓存信息,映射关系已经改变
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// Set索引缓存Key
+ /// 旧的映射关系
+ /// ZSET索引缓存Key
+ /// 待修改缓存数据
+ ///
+ public async Task ModifyDataAsync(
+ string redisHashCacheKey,
+ string redisSetIndexCacheKey,
+ string oldMemberId,
+ string redisZSetScoresIndexCacheKey,
+ T newData) where T : DeviceCacheBasicModel
+ {
+ if (newData == null
+ || string.IsNullOrWhiteSpace(redisHashCacheKey)
+ || string.IsNullOrWhiteSpace(redisSetIndexCacheKey)
+ || string.IsNullOrWhiteSpace(oldMemberId)
+ || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ {
+ _logger.LogError($"{nameof(ModifyDataAsync)} 参数异常,-101");
+ return;
+ }
+
+ var luaScript = @"
+ local hashCacheKey = KEYS[1]
+ local setIndexCacheKey = KEYS[2]
+ local zsetScoresIndexCacheKey = KEYS[3]
+ local member = ARGV[1]
+ local oldMember = ARGV[2]
+ local newData = ARGV[3]
+ local newScore = ARGV[4]
+
+ -- 校验存在性
+ if redis.call('HEXISTS', hashCacheKey, oldMember) == 0 then
+ return 0
+ end
+
+ -- 删除旧数据
+ redis.call('HDEL', hashCacheKey, oldMember)
+
+ -- 插入新主数据
+ redis.call('HSET', hashCacheKey, member, newData)
+
+ -- 处理变更
+ if newScore ~= '' then
+ -- 删除旧索引
+ redis.call('SREM', setIndexCacheKey, oldMember)
+ redis.call('ZREM', zsetScoresIndexCacheKey, oldMember)
+
+ -- 添加新索引
+ redis.call('SADD', setIndexCacheKey, member)
+ redis.call('ZADD', zsetScoresIndexCacheKey, newScore, member)
+ end
+
+ return 1
+ ";
+
+ var result = await Instance.EvalAsync(luaScript,
+ new[]
+ {
+ redisHashCacheKey,
+ redisSetIndexCacheKey,
+ redisZSetScoresIndexCacheKey
+ },
+ new object[]
+ {
+ newData.MemberID,
+ oldMemberId,
+ newData.Serialize(),
+ newData.ScoreValue.ToString() ?? "",
+ });
+
+ if ((int)result == 0)
+ {
+ _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberID}数据失败,-102");
+ }
+ }
+
+
+ ///
+ /// 通过集中器与表计信息排序索引获取指定集中器号集合数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// 集中器与表计信息排序索引ZSET缓存Key
+ /// 集中器Id
+ /// 分页尺寸
+ /// 最后一个索引
+ /// 最后一个唯一标识
+ /// 排序方式
+ ///
+ public async Task> GetPagedData(
+ string redisCacheKey,
+ string redisCacheFocusScoresIndexKey,
+ IEnumerable focusIds,
+ int pageSize = 10,
+ decimal? lastScore = null,
+ string lastMember = null,
+ bool descending = true)
+ where T : DeviceCacheBasicModel
+ {
+ throw new Exception();
+ }
+
+ ///
+ /// 通过ZSET索引获取数据
+ ///
+ ///
+ /// 主数据存储Hash缓存Key
+ /// ZSET索引缓存Key
+ /// 分页尺寸
+ /// 最后一个索引
+ /// 最后一个唯一标识
+ /// 排序方式
+ ///
+ public async Task> GetAllPagedData(
+ string redisHashCacheKey,
+ string redisZSetScoresIndexCacheKey,
+ int pageSize = 1000,
+ decimal? lastScore = null,
+ string lastMember = null,
+ bool descending = true)
+ where T : DeviceCacheBasicModel
+ {
+ // 参数校验(保持不变)
+ if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ {
+ _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
+ return null;
+ }
+
+ if (pageSize < 1 || pageSize > 10000)
+ {
+ _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102");
+ return null;
+ }
+
+ var luaScript = @"
+ local command = ARGV[1]
+ local range_start = ARGV[2]
+ local range_end = ARGV[3]
+ local limit = tonumber(ARGV[4])
+ local last_score = ARGV[5]
+ local last_member = ARGV[6]
+
+ -- 处理相同分数下的字典序分页
+ if last_score ~= '' and last_member ~= '' then
+ if command == 'ZRANGEBYSCORE' then
+ range_start = '(' .. last_score
+ range_end = '(' .. last_score .. ' ' .. last_member
+ else
+ range_start = '(' .. last_score .. ' ' .. last_member
+ range_end = '(' .. last_score
+ end
+ end
+
+ -- 执行范围查询
+ local members
+ if command == 'ZRANGEBYSCORE' then
+ members = redis.call(command, KEYS[1], range_start, range_end,
+ 'WITHSCORES', 'LIMIT', 0, limit)
+ else
+ members = redis.call(command, KEYS[1], range_end, range_start,
+ 'WITHSCORES', 'LIMIT', 0, limit)
+ end
+
+ -- 提取成员和分数
+ local result_members = {}
+ local result_scores = {}
+ for i = 1, #members, 2 do
+ table.insert(result_members, members[i])
+ table.insert(result_scores, members[i+1])
+ end
+
+ -- 获取Hash数据
+ local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members))
+
+ return {
+ #result_members,
+ result_members,
+ result_scores,
+ hash_data
+ }";
+
+ //正确设置范围参数
+ string rangeStart, rangeEnd;
+ if (descending)
+ {
+ rangeStart = lastScore.HasValue ? $"({lastScore}" : "+inf";
+ rangeEnd = "-inf"; // 降序时固定为最小值
+ }
+ else
+ {
+ rangeStart = lastScore.HasValue ? $"({lastScore}" : "-inf";
+ rangeEnd = "+inf"; // 升序时固定为最大值
+ }
+
+ var result = (object[])await Instance.EvalAsync(
+ luaScript,
+ new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey },
+ new object[]
+ {
+ descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE",
+ rangeStart,
+ rangeEnd,
+ (pageSize + 1).ToString(), // 多取1条用于判断hasNext
+ lastScore?.ToString() ?? "",
+ lastMember ?? ""
+ });
+
+ if ((long)result[0] == 0)
+ return new BusCacheGlobalPagedResult { Items = new List() };
+
+ // 处理结果集
+ var members = ((object[])result[1]).Cast().ToList();
+ var scores = ((object[])result[2]).Cast().Select(decimal.Parse).ToList();
+ var hashData = ((object[])result[3]).Cast().ToList();
+
+ //合并有效数据并处理游标
+ var validItems = members.Zip(hashData, (m, h) =>
+ !string.IsNullOrWhiteSpace(h) ? BusJsonSerializer.Deserialize(h) : null)
+ .Where(x => x != null)
+ .Take(pageSize + 1)
+ .ToList();
+
+ var hasNext = validItems.Count > pageSize;
+ var actualItems = hasNext ? validItems.Take(pageSize) : validItems;
+
+ // 计算下一页起始点
+ string nextMember = null;
+ decimal? nextScore = null;
+ if (hasNext)
+ {
+ // 获取实际返回的最后一条有效数据
+ var lastValidIndex = actualItems.Count() - 1;
+ nextMember = members[lastValidIndex];
+ nextScore = scores[lastValidIndex];
+ }
+
+ return new BusCacheGlobalPagedResult
+ {
+ Items = actualItems.ToList(),
+ HasNext = hasNext,
+ NextScore = nextScore,
+ NextMember = nextMember,
+ TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
+ PageSize = pageSize,
+ };
+ }
+
+ /////
+ ///// 通过集中器与表计信息排序索引获取数据
+ /////
+ /////
+ ///// 主数据存储Hash缓存Key
+ ///// ZSET索引缓存Key
+ ///// 分页尺寸
+ ///// 最后一个索引
+ ///// 最后一个唯一标识
+ ///// 排序方式
+ /////
+ //public async Task> GetAllPagedData(
+ //string redisHashCacheKey,
+ //string redisZSetScoresIndexCacheKey,
+ //int pageSize = 1000,
+ //decimal? lastScore = null,
+ //string lastMember = null,
+ //bool descending = true)
+ //where T : DeviceCacheBasicModel
+ //{
+ // // 参数校验增强
+ // if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey))
+ // {
+ // _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101");
+ // return null;
+ // }
+
+ // if (pageSize < 1 || pageSize > 10000)
+ // {
+ // _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102");
+ // return null;
+ // }
+
+ // //// 分页参数解析
+ // //var (startScore, excludeMember) = descending
+ // // ? (lastScore ?? decimal.MaxValue, lastMember)
+ // // : (lastScore ?? 0, lastMember);
+
+ // //执行分页查询(整合游标处理)
+ // var pageResult = await GetPagedMembers(
+ // redisZSetScoresIndexCacheKey,
+ // pageSize,
+ // lastScore,
+ // lastMember,
+ // descending);
+
+ // // 批量获取数据(优化内存分配)
+ // var dataDict = await BatchGetData(redisHashCacheKey, pageResult.Members);
+
+ // return new BusCacheGlobalPagedResult
+ // {
+ // Items = pageResult.Members.Select(m => dataDict.TryGetValue(m, out var v) ? v : default)
+ // .Where(x => x != null).ToList(),
+ // HasNext = pageResult.HasNext,
+ // NextScore = pageResult.NextScore,
+ // NextMember = pageResult.NextMember,
+ // TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey),
+ // PageSize = pageSize,
+ // };
+ //}
+
+ ///
+ /// 游标分页查询
+ ///
+ /// 排序索引ZSET缓存Key
+ /// 分页数量
+ /// 上一个索引
+ /// 上一个标识
+ /// 排序方式
+ ///
+ private async Task<(List Members, bool HasNext, decimal? NextScore, string NextMember)> GetPagedMembers(
+ string redisZSetScoresIndexCacheKey,
+ int pageSize,
+ decimal? lastScore,
+ string lastMember,
+ bool descending)
+ {
+ // 根据排序方向初始化参数
+ long initialScore = descending ? long.MaxValue : 0;
+ decimal? currentScore = lastScore ?? initialScore;
+ string currentMember = lastMember;
+ var members = new List(pageSize + 1);
+
+ // 使用游标分页查询
+ while (members.Count < pageSize + 1 && currentScore.HasValue)
+ {
+ var (batch, hasMore) = await GetNextBatch(
+ redisZSetScoresIndexCacheKey,
+ pageSize + 1 - members.Count,
+ currentScore.Value,
+ currentMember,
+ descending);
+
+ if (!batch.Any()) break;
+
+ members.AddRange(batch);
+
+ // 更新游标
+ currentMember = batch.LastOrDefault();
+ currentScore = await GetNextScore(redisZSetScoresIndexCacheKey, currentMember, descending);
+ }
+
+ // 处理分页结果
+ bool hasNext = members.Count > pageSize;
+ var resultMembers = members.Take(pageSize).ToList();
+
+ return (
+ resultMembers,
+ hasNext,
+ currentScore,
+ currentMember
+ );
+ }
+
+ ///
+ /// 批量获取指定分页的数据
+ ///
+ ///
+ /// Hash表缓存key
+ /// Hash表字段集合
+ ///
+ private async Task> BatchGetData(
+ string redisHashCacheKey,
+ IEnumerable members)
+ where T : DeviceCacheBasicModel
+ {
+ using var pipe = Instance.StartPipe();
+
+ foreach (var member in members)
+ {
+ pipe.HGet(redisHashCacheKey, member);
+ }
+
+ var results = pipe.EndPipe();
+ return await Task.FromResult(members.Zip(results, (k, v) => new { k, v })
+ .ToDictionary(x => x.k, x => (T)x.v));
+ }
+
+ ///
+ /// 处理下一个分页数据
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ private async Task<(string[] Batch, bool HasMore)> GetNextBatch(
+ string zsetKey,
+ int limit,
+ decimal score,
+ string excludeMember,
+ bool descending)
+ {
+ var query = descending
+ ? await Instance.ZRevRangeByScoreAsync(
+ zsetKey,
+ max: score,
+ min: 0,
+ offset: 0,
+ count: limit)
+ : await Instance.ZRangeByScoreAsync(
+ zsetKey,
+ min: score,
+ max: long.MaxValue,
+ offset: 0,
+ count: limit);
+
+ return (query, query.Length >= limit);
+ }
+
+ ///
+ /// 获取下一页游标
+ ///
+ /// 排序索引ZSET缓存Key
+ /// 最后一个唯一标识
+ /// 排序方式
+ ///
+ private async Task GetNextScore(
+ string redisZSetScoresIndexCacheKey,
+ string lastMember,
+ bool descending)
+ {
+ if (string.IsNullOrEmpty(lastMember)) return null;
+
+ var score = await Instance.ZScoreAsync(redisZSetScoresIndexCacheKey, lastMember);
+ if (!score.HasValue) return null;
+
+ // 根据排序方向调整score
+ return descending
+ ? score.Value - 1 // 降序时下页查询小于当前score
+ : score.Value + 1; // 升序时下页查询大于当前score
+ }
+
+ ///
+ /// 获取指定ZSET区间内的总数量
+ ///
+ ///
+ ///
+ ///
+ ///
+ public async Task GetCount(string zsetKey, long min, long max)
+ {
+ // 缓存计数优化
+ var cacheKey = $"{zsetKey}_count_{min}_{max}";
+ var cached = await Instance.GetAsync(cacheKey);
+
+ if (cached.HasValue)
+ return cached.Value;
+
+ var count = await Instance.ZCountAsync(zsetKey, min, max);
+ await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
+ return count;
+ }
+
+ ///
+ /// 获取指定ZSET的总数量
+ ///
+ ///
+ ///
+ private async Task GetTotalCount(string redisZSetScoresIndexCacheKey)
+ {
+ // 缓存计数优化
+ var cacheKey = $"{redisZSetScoresIndexCacheKey}_total_count";
+ var cached = await Instance.GetAsync(cacheKey);
+
+ if (cached.HasValue)
+ return cached.Value;
+
+ var count = await Instance.ZCountAsync(redisZSetScoresIndexCacheKey, 0, decimal.MaxValue);
+ await Instance.SetExAsync(cacheKey, 30, count); // 缓存30秒
+ return count;
+ }
+ }
+}
diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index e4c078d..3af4b6a 100644
--- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -23,6 +23,9 @@ using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Kafka.Attributes;
using System.Text.Json;
using JiShe.CollectBus.Kafka;
+using JiShe.CollectBus.Application.Contracts;
+using JiShe.CollectBus.Common.Models;
+using System.Diagnostics;
namespace JiShe.CollectBus.Samples;
@@ -32,17 +35,23 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
private readonly IIoTDBProvider _iotDBProvider;
private readonly IoTDBRuntimeContext _dbContext;
private readonly IoTDBOptions _options;
+ private readonly IRedisDataCacheService _redisDataCacheService;
public SampleAppService(IIoTDBProvider iotDBProvider, IOptions options,
- IoTDBRuntimeContext dbContext, ILogger logger)
+ IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService)
{
_iotDBProvider = iotDBProvider;
_options = options.Value;
_dbContext = dbContext;
_logger = logger;
+ _redisDataCacheService = redisDataCacheService;
}
-
+ ///
+ /// 测试 UseSessionPool
+ ///
+ ///
+ ///
[HttpGet]
public async Task UseSessionPool(long timestamps)
{
@@ -72,7 +81,10 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
await _iotDBProvider.InsertAsync(meter);
}
-
+ ///
+ /// 测试Session切换
+ ///
+ ///
[HttpGet]
public async Task UseTableSessionPool()
{
@@ -125,7 +137,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
var timeDensity = "15";
//获取缓存中的电表信息
- var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
+ var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter);
@@ -178,6 +190,43 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
await _iotDBProvider.InsertAsync(meter);
}
+ ///
+ /// 测试单个测点数据项
+ ///
+ ///
+ [HttpGet]
+ public async Task TestRedisCacheGetAllPagedData()
+ {
+ var timeDensity = "15";
+ string SystemType = "";
+ string ServerTagName = "JiSheCollectBus2";
+ var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}";
+ var timer = Stopwatch.StartNew();
+
+ decimal? cursor = null;
+ string member = null;
+ bool hasNext;
+ List meterInfos = new List();
+ do
+ {
+ var page = await _redisDataCacheService
+ .GetAllPagedData(
+ redisCacheMeterInfoHashKeyTemp,
+ redisCacheMeterInfoZSetScoresIndexKeyTemp);
+
+ meterInfos.AddRange(page.Items);
+ cursor = page.NextScore;
+ member = page.NextMember;
+ hasNext = page.HasNext;
+ } while (hasNext);
+
+ timer.Stop();
+
+ _logger.LogInformation($"{nameof(TestRedisCacheGetAllPagedData)} 获取电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒");
+ }
+
public Task GetAsync()
{
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index 2fd9781..ce4635b 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -1,11 +1,13 @@
using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
+using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
+using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.MessageIssueds;
@@ -34,13 +36,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IIoTDBProvider _dbProvider;
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService;
-
+ private readonly IRedisDataCacheService _redisDataCacheService;
public BasicScheduledMeterReadingService(
ILogger logger,
ICapPublisher producerBus,
IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService,
+ IRedisDataCacheService redisDataCacheService,
IIoTDBProvider dbProvider)
{
_producerBus = producerBus;
@@ -48,6 +51,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository;
_producerService = producerService;
+ _redisDataCacheService = redisDataCacheService;
}
///
@@ -121,7 +125,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的表信息
- var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, meteryType, timeDensity)}*";
+ var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meteryType, timeDensity)}*";
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
@@ -209,25 +213,51 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task InitAmmeterCacheData(string gatherCode = "")
{
#if DEBUG
- var timeDensity = "15";
- string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
- //获取缓存中的电表信息
- var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
+ //var timeDensity = "15";
+ //string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}";
+ ////获取缓存中的电表信息
+ //var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*";
- var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
- var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
- //List focusAddressDataLista = new List();
- List meterInfos = new List();
- foreach (var item in tempMeterInfos)
- {
- var tempData = item.Adapt();
- tempData.FocusId = item.FocusID;
- tempData.MeterId = item.Id;
- meterInfos.Add(tempData);
- //focusAddressDataLista.Add(item.FocusAddress);
- }
+ //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
+ //var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter);
+ ////List focusAddressDataLista = new List();
+ //List meterInfos = new List();
+ //foreach (var item in tempMeterInfos)
+ //{
+ // var tempData = item.Adapt();
+ // tempData.FocusId = item.FocusID;
+ // tempData.MeterId = item.Id;
+ // meterInfos.Add(tempData);
+ // //focusAddressDataLista.Add(item.FocusAddress);
+ //}
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
+
+ var timeDensity = "15";
+ var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
+ var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
+ var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}";
+
+
+ decimal? cursor = null;
+ string member = null;
+ bool hasNext;
+ List meterInfos = new List();
+ do
+ {
+ var page = await _redisDataCacheService.GetAllPagedData(
+ redisCacheMeterInfoHashKeyTemp,
+ redisCacheMeterInfoZSetScoresIndexKeyTemp,
+ pageSize: 1000,
+ lastScore: cursor,
+ lastMember: member);
+
+ meterInfos.AddRange(page.Items);
+ cursor = page.HasNext ? page.NextScore : null;
+ member = page.HasNext ? page.NextMember : null;
+ hasNext = page.HasNext;
+ } while (hasNext);
+
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
#endif
@@ -251,10 +281,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
{
- var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
- var redisCacheFocusIndexKey = $"{string.Format(RedisConst.CacheMeterInfoFocusIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
- var redisCacheScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
- var redisCacheGlobalIndexKey = $"{string.Format(RedisConst.CacheMeterInfoGlobalIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+ var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+ var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
+ var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}";
List ammeterInfos = new List();
//将表计信息根据集中器分组,获得集中器号
@@ -329,11 +358,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
- await FreeRedisProvider.BatchAddMeterData(
- redisCacheKey,
- redisCacheFocusIndexKey,
- redisCacheScoresIndexKey,
- redisCacheGlobalIndexKey, ammeterInfos);
+ await _redisDataCacheService.BatchInsertDataAsync(
+ redisCacheMeterInfoHashKey,
+ redisCacheMeterInfoSetIndexKey,
+ redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos);
//在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
@@ -635,7 +663,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
var currentTime = DateTime.Now;
var pendingCopyReadTime = currentTime.AddMinutes(timeDensity);
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
- var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}";
+ var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}";
if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes))
{
@@ -897,7 +925,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var focusInfo in focusGroup)
{
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
- var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
+ var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
foreach (var ammeterInfo in focusInfo.Value)
{
@@ -1113,7 +1141,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
- var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}";
+ var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}";
Dictionary keyValuePairs = new Dictionary();
foreach (var subItem in item)
{
@@ -1260,7 +1288,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType)
{
- return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*";
+ return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*";
}
#endregion
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
index d44fe56..25c5476 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs
@@ -4,6 +4,7 @@ using System.Threading.Tasks;
using Confluent.Kafka;
using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
+using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
using JiShe.CollectBus.Common.Helpers;
@@ -35,8 +36,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{
string serverTagName = string.Empty;
- public EnergySystemScheduledMeterReadingService(ILogger logger,
- ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider)
+ public EnergySystemScheduledMeterReadingService(
+ ILogger logger,
+ ICapPublisher producerBus, IIoTDBProvider dbProvider,
+ IMeterReadingRecordRepository meterReadingRecordRepository,
+ IConfiguration configuration,
+ IProducerService producerService,
+ IRedisDataCacheService redisDataCacheService)
+ : base(logger,
+ producerBus,
+ meterReadingRecordRepository,
+ producerService,
+ redisDataCacheService,
+ dbProvider)
{
serverTagName = configuration.GetValue(CommonConst.ServerTagName)!;
}
diff --git a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs
index 9377056..dce5307 100644
--- a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs
+++ b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs
@@ -32,23 +32,18 @@ namespace JiShe.CollectBus.Common.Consts
///
/// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
///
- public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:{"{3}"}";
+ public const string CacheMeterInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:{"{3}"}";
///
- /// 缓存表计信息集中器索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ /// 缓存表计信息索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
///
- public const string CacheMeterInfoFocusIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:FocusIndex:{"{3}"}";
+ public const string CacheMeterInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:SetIndex:{"{3}"}";
///
- /// 缓存表计信息集中器排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ /// 缓存表计信息排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
///
- public const string CacheMeterInfoScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:ScoresIndex:{"{3}"}";
-
- ///
- /// 缓存表计信息集中器采集频率分组全局索引ZSet缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
- ///
- public const string CacheMeterInfoGlobalIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:GlobalIndex:{"{3}"}";
-
+ public const string CacheMeterInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:ZSetScoresIndex:{"{3}"}";
+
public const string TaskInfo = "TaskInfo";
///
@@ -60,7 +55,17 @@ namespace JiShe.CollectBus.Common.Consts
///
/// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
///
- public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}";
+ public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}";
+
+ ///
+ /// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ ///
+ public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}";
+
+ ///
+ /// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
+ ///
+ public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}";
/////
///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
diff --git a/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs b/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs
index 303b2e4..465bd15 100644
--- a/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs
+++ b/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs
@@ -22,6 +22,22 @@ namespace JiShe.CollectBus.Common.Models
///
public long TotalCount { get; set; }
+ ///
+ /// 每页条数
+ ///
+ public int PageSize { get; set; }
+
+ ///
+ /// 总页数
+ ///
+ public int PageCount
+ {
+ get
+ {
+ return (int)Math.Ceiling((double)TotalCount / PageSize);
+ }
+ }
+
///
/// 是否有下一页
///
diff --git a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
index 59011de..335c17c 100644
--- a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
+++ b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs
@@ -22,8 +22,13 @@ namespace JiShe.CollectBus.Common.Models
public int MeterId { get; set; }
///
- /// 唯一标识,是redis ZSet和Set memberid
+ /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义
///
public virtual string MemberID => $"{FocusId}:{MeterId}";
+
+ ///
+ /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳
+ ///
+ public virtual long ScoreValue=> ((long)FocusId << 32) | (uint)MeterId;
}
}
diff --git a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs
index cebc9c4..d3a9bff 100644
--- a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs
+++ b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs
@@ -36,639 +36,472 @@ namespace JiShe.CollectBus.FreeRedisProvider
public IRedisClient GetInstance()
{
- var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB}";
+ var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB},MaxPoolSize={_option.MaxPoolSize}";
Instance = new RedisClient(connectionString);
Instance.Serialize = obj => BusJsonSerializer.Serialize(obj);
Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type);
- Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
+ Instance.Notice += (s, e) => Trace.WriteLine(e.Log);
return Instance;
}
- ///
- /// 单个添加数据
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 集中器索引Set缓存Key
- /// 集中器排序索引ZSET缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 表计信息
- /// 可选时间戳
- ///
- public async Task AddMeterCacheData(
- string redisCacheKey,
- string redisCacheFocusIndexKey,
- string redisCacheScoresIndexKey,
- string redisCacheGlobalIndexKey,
- T data,
- DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
- {
- // 参数校验增强
- if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
- || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
- || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
- || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
- {
- throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
- }
+ /////
+ ///// 单个添加数据
+ /////
+ /////
+ ///// 主数据存储Hash缓存Key
+ ///// 集中器索引Set缓存Key
+ ///// 集中器排序索引ZSET缓存Key
+ ///// 集中器采集频率分组全局索引ZSet缓存Key
+ ///// 表计信息
+ ///// 可选时间戳
+ /////
+ //public async Task AddMeterCacheData(
+ //string redisCacheKey,
+ //string redisCacheFocusIndexKey,
+ //string redisCacheScoresIndexKey,
+ //string redisCacheGlobalIndexKey,
+ //T data,
+ //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
+ //{
+ // // 参数校验增强
+ // if (data == null || string.IsNullOrWhiteSpace(redisCacheKey)
+ // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
+ // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
+ // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
+ // {
+ // throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101");
+ // }
- // 计算组合score(分类ID + 时间戳)
- var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
+ // // 计算组合score(分类ID + 时间戳)
+ // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
- long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
+ // long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks;
- //全局索引写入
- long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
+ // //全局索引写入
+ // long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
- // 使用事务保证原子性
- using (var trans = Instance.Multi())
- {
- // 主数据存储Hash
- trans.HSet(redisCacheKey, data.MemberID, data.Serialize());
+ // // 使用事务保证原子性
+ // using (var trans = Instance.Multi())
+ // {
+ // // 主数据存储Hash
+ // trans.HSet(redisCacheKey, data.MemberID, data.Serialize());
- // 分类索引
- trans.SAdd(redisCacheFocusIndexKey, data.MemberID);
+ // // 分类索引
+ // trans.SAdd(redisCacheFocusIndexKey, data.MemberID);
- // 排序索引使用ZSET
- trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID);
+ // // 排序索引使用ZSET
+ // trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID);
- //全局索引
- trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID);
+ // //全局索引
+ // trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID);
- var results = trans.Exec();
+ // var results = trans.Exec();
- if (results == null || results.Length <= 0)
- throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
- }
+ // if (results == null || results.Length <= 0)
+ // throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102");
+ // }
- await Task.CompletedTask;
- }
+ // await Task.CompletedTask;
+ //}
- ///
- /// 批量添加数据
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 集中器索引Set缓存Key
- /// 集中器排序索引ZSET缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 数据集合
- /// 可选时间戳
- ///
- public async Task BatchAddMeterData(
- string redisCacheKey,
- string redisCacheFocusIndexKey,
- string redisCacheScoresIndexKey,
- string redisCacheGlobalIndexKey,
- IEnumerable items,
- DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
- {
- if (items == null
- || items.Count() <=0
- || string.IsNullOrWhiteSpace(redisCacheKey)
- || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
- || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
- || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
- {
- throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101");
- }
+ /////
+ ///// 批量添加数据
+ /////
+ /////
+ ///// 主数据存储Hash缓存Key
+ ///// 集中器索引Set缓存Key
+ ///// 集中器排序索引ZSET缓存Key
+ ///// 集中器采集频率分组全局索引ZSet缓存Key
+ ///// 数据集合
+ ///// 可选时间戳
+ /////
+ //public async Task BatchAddMeterData(
+ //string redisCacheKey,
+ //string redisCacheFocusIndexKey,
+ //string redisCacheScoresIndexKey,
+ //string redisCacheGlobalIndexKey,
+ //IEnumerable items,
+ //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel
+ //{
+ // if (items == null
+ // || items.Count() <=0
+ // || string.IsNullOrWhiteSpace(redisCacheKey)
+ // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
+ // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
+ // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
+ // {
+ // throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101");
+ // }
- const int BATCH_SIZE = 1000; // 每批1000条
- var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
+ // const int BATCH_SIZE = 1000; // 每批1000条
+ // var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2);
- foreach (var batch in items.Batch(BATCH_SIZE))
- {
- await semaphore.WaitAsync();
+ // foreach (var batch in items.Batch(BATCH_SIZE))
+ // {
+ // await semaphore.WaitAsync();
- _ = Task.Run(() =>
- {
- using (var pipe = Instance.StartPipe())
- {
- foreach (var item in batch)
- {
- // 计算组合score(分类ID + 时间戳)
- var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
+ // _ = Task.Run(() =>
+ // {
+ // using (var pipe = Instance.StartPipe())
+ // {
+ // foreach (var item in batch)
+ // {
+ // // 计算组合score(分类ID + 时间戳)
+ // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow;
- long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
+ // long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks;
- //全局索引写入
- long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
+ // //全局索引写入
+ // long globalScore = actualTimestamp.ToUnixTimeMilliseconds();
- // 主数据存储Hash
- pipe.HSet(redisCacheKey, item.MemberID, item.Serialize());
+ // // 主数据存储Hash
+ // pipe.HSet(redisCacheKey, item.MemberID, item.Serialize());
- // 分类索引
- pipe.SAdd(redisCacheFocusIndexKey, item.MemberID);
+ // // 分类索引Set
+ // pipe.SAdd(redisCacheFocusIndexKey, item.MemberID);
- // 排序索引使用ZSET
- pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID);
+ // // 排序索引使用ZSET
+ // pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID);
- //全局索引
- pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID);
- }
- pipe.EndPipe();
- }
- semaphore.Release();
- });
- }
+ // //全局索引
+ // pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID);
+ // }
+ // pipe.EndPipe();
+ // }
+ // semaphore.Release();
+ // });
+ // }
- await Task.CompletedTask;
- }
+ // await Task.CompletedTask;
+ //}
- ///
- /// 删除指定redis缓存key的缓存数据
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 集中器索引Set缓存Key
- /// 集中器排序索引ZSET缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 表计信息
- ///
- public async Task RemoveMeterData(
- string redisCacheKey,
- string redisCacheFocusIndexKey,
- string redisCacheScoresIndexKey,
- string redisCacheGlobalIndexKey,
- T data) where T : DeviceCacheBasicModel
- {
+ /////
+ ///// 删除指定redis缓存key的缓存数据
+ /////
+ /////
+ ///// 主数据存储Hash缓存Key
+ ///// 集中器索引Set缓存Key
+ ///// 集中器排序索引ZSET缓存Key
+ ///// 集中器采集频率分组全局索引ZSet缓存Key
+ ///// 表计信息
+ /////
+ //public async Task RemoveMeterData(
+ //string redisCacheKey,
+ //string redisCacheFocusIndexKey,
+ //string redisCacheScoresIndexKey,
+ //string redisCacheGlobalIndexKey,
+ //T data) where T : DeviceCacheBasicModel
+ //{
- if (data == null
- || string.IsNullOrWhiteSpace(redisCacheKey)
- || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
- || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
- || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
- {
- throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101");
- }
+ // if (data == null
+ // || string.IsNullOrWhiteSpace(redisCacheKey)
+ // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey)
+ // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
+ // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
+ // {
+ // throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101");
+ // }
- const string luaScript = @"
- local mainKey = KEYS[1]
- local focusIndexKey = KEYS[2]
- local scoresIndexKey = KEYS[3]
- local globalIndexKey = KEYS[4]
- local member = ARGV[1]
+ // const string luaScript = @"
+ // local mainKey = KEYS[1]
+ // local focusIndexKey = KEYS[2]
+ // local scoresIndexKey = KEYS[3]
+ // local globalIndexKey = KEYS[4]
+ // local member = ARGV[1]
- local deleted = 0
- if redis.call('HDEL', mainKey, member) > 0 then
- deleted = 1
- end
+ // local deleted = 0
+ // if redis.call('HDEL', mainKey, member) > 0 then
+ // deleted = 1
+ // end
- redis.call('SREM', focusIndexKey, member)
- redis.call('ZREM', scoresIndexKey, member)
- redis.call('ZREM', globalIndexKey, member)
- return deleted
- ";
+ // redis.call('SREM', focusIndexKey, member)
+ // redis.call('ZREM', scoresIndexKey, member)
+ // redis.call('ZREM', globalIndexKey, member)
+ // return deleted
+ // ";
- var keys = new[]
- {
- redisCacheKey,
- redisCacheFocusIndexKey,
- redisCacheScoresIndexKey,
- redisCacheGlobalIndexKey
- };
+ // var keys = new[]
+ // {
+ // redisCacheKey,
+ // redisCacheFocusIndexKey,
+ // redisCacheScoresIndexKey,
+ // redisCacheGlobalIndexKey
+ // };
- var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
+ // var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID });
- if ((int)result == 0)
- throw new KeyNotFoundException("指定数据不存在");
- }
+ // if ((int)result == 0)
+ // throw new KeyNotFoundException("指定数据不存在");
+ //}
- ///
- /// 修改表计缓存信息
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 旧集中器索引Set缓存Key
- /// 新集中器索引Set缓存Key
- /// 集中器排序索引ZSET缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 表计信息
- /// 可选时间戳
- ///
- public async Task UpdateMeterData(
- string redisCacheKey,
- string oldRedisCacheFocusIndexKey,
- string newRedisCacheFocusIndexKey,
- string redisCacheScoresIndexKey,
- string redisCacheGlobalIndexKey,
- T newData,
- DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel
- {
- if (newData == null
- || string.IsNullOrWhiteSpace(redisCacheKey)
- || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey)
- || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey)
- || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
- || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
- {
- throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101");
- }
+ /////
+ ///// 修改表计缓存信息
+ /////
+ /////
+ ///// 主数据存储Hash缓存Key
+ ///// 旧集中器索引Set缓存Key
+ ///// 新集中器索引Set缓存Key
+ ///// 集中器排序索引ZSET缓存Key
+ ///// 集中器采集频率分组全局索引ZSet缓存Key
+ ///// 表计信息
+ ///// 可选时间戳
+ /////
+ //public async Task UpdateMeterData(
+ //string redisCacheKey,
+ //string oldRedisCacheFocusIndexKey,
+ //string newRedisCacheFocusIndexKey,
+ //string redisCacheScoresIndexKey,
+ //string redisCacheGlobalIndexKey,
+ //T newData,
+ //DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel
+ //{
+ // if (newData == null
+ // || string.IsNullOrWhiteSpace(redisCacheKey)
+ // || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey)
+ // || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey)
+ // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey)
+ // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
+ // {
+ // throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101");
+ // }
- var luaScript = @"
- local mainKey = KEYS[1]
- local oldFocusIndexKey = KEYS[2]
- local newFocusIndexKey = KEYS[3]
- local scoresIndexKey = KEYS[4]
- local globalIndexKey = KEYS[5]
- local member = ARGV[1]
- local newData = ARGV[2]
- local newScore = ARGV[3]
- local newGlobalScore = ARGV[4]
+ // var luaScript = @"
+ // local mainKey = KEYS[1]
+ // local oldFocusIndexKey = KEYS[2]
+ // local newFocusIndexKey = KEYS[3]
+ // local scoresIndexKey = KEYS[4]
+ // local globalIndexKey = KEYS[5]
+ // local member = ARGV[1]
+ // local newData = ARGV[2]
+ // local newScore = ARGV[3]
+ // local newGlobalScore = ARGV[4]
- -- 校验存在性
- if redis.call('HEXISTS', mainKey, member) == 0 then
- return 0
- end
+ // -- 校验存在性
+ // if redis.call('HEXISTS', mainKey, member) == 0 then
+ // return 0
+ // end
- -- 更新主数据
- redis.call('HSET', mainKey, member, newData)
+ // -- 更新主数据
+ // redis.call('HSET', mainKey, member, newData)
- -- 处理变更
- if newScore ~= '' then
- -- 删除旧索引
- redis.call('SREM', oldFocusIndexKey, member)
- redis.call('ZREM', scoresIndexKey, member)
+ // -- 处理变更
+ // if newScore ~= '' then
+ // -- 删除旧索引
+ // redis.call('SREM', oldFocusIndexKey, member)
+ // redis.call('ZREM', scoresIndexKey, member)
- -- 添加新索引
- redis.call('SADD', newFocusIndexKey, member)
- redis.call('ZADD', scoresIndexKey, newScore, member)
- end
+ // -- 添加新索引
+ // redis.call('SADD', newFocusIndexKey, member)
+ // redis.call('ZADD', scoresIndexKey, newScore, member)
+ // end
- -- 更新全局索引
- if newGlobalScore ~= '' then
- -- 删除旧索引
- redis.call('ZREM', globalIndexKey, member)
+ // -- 更新全局索引
+ // if newGlobalScore ~= '' then
+ // -- 删除旧索引
+ // redis.call('ZREM', globalIndexKey, member)
- -- 添加新索引
- redis.call('ZADD', globalIndexKey, newGlobalScore, member)
- end
+ // -- 添加新索引
+ // redis.call('ZADD', globalIndexKey, newGlobalScore, member)
+ // end
- return 1
- ";
+ // return 1
+ // ";
- var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
- var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds();
- var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks;
+ // var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow;
+ // var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds();
+ // var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks;
- var result = await Instance.EvalAsync(luaScript,
- new[]
- {
- redisCacheKey,
- oldRedisCacheFocusIndexKey,
- newRedisCacheFocusIndexKey,
- redisCacheScoresIndexKey,
- redisCacheGlobalIndexKey
- },
- new object[]
- {
- newData.MemberID,
- newData.Serialize(),
- newScoreValue.ToString() ?? "",
- newGlobalScore.ToString() ?? ""
- });
+ // var result = await Instance.EvalAsync(luaScript,
+ // new[]
+ // {
+ // redisCacheKey,
+ // oldRedisCacheFocusIndexKey,
+ // newRedisCacheFocusIndexKey,
+ // redisCacheScoresIndexKey,
+ // redisCacheGlobalIndexKey
+ // },
+ // new object[]
+ // {
+ // newData.MemberID,
+ // newData.Serialize(),
+ // newScoreValue.ToString() ?? "",
+ // newGlobalScore.ToString() ?? ""
+ // });
- if ((int)result == 0)
- {
- throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在");
- }
- }
+ // if ((int)result == 0)
+ // {
+ // throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在");
+ // }
+ //}
- public async Task> SingleGetMeterPagedData(
- string redisCacheKey,
- string redisCacheScoresIndexKey,
- int focusId,
- int pageSize = 10,
- int pageIndex = 1,
- bool descending = true)
- {
- // 计算score范围
- long minScore = (long)focusId << 32;
- long maxScore = ((long)focusId + 1) << 32;
+ //public async Task> SingleGetMeterPagedData(
+ //string redisCacheKey,
+ //string redisCacheScoresIndexKey,
+ //int focusId,
+ //int pageSize = 10,
+ //int pageIndex = 1,
+ //bool descending = true)
+ //{
+ // // 计算score范围
+ // long minScore = (long)focusId << 32;
+ // long maxScore = ((long)focusId + 1) << 32;
- // 分页参数计算
- int start = (pageIndex - 1) * pageSize;
+ // // 分页参数计算
+ // int start = (pageIndex - 1) * pageSize;
- // 获取排序后的member列表
- var members = descending
- ? await Instance.ZRevRangeByScoreAsync(
- redisCacheScoresIndexKey,
- maxScore,
- minScore,
- start,
- pageSize)
- : await Instance.ZRangeByScoreAsync(
- redisCacheScoresIndexKey,
- minScore,
- maxScore,
- start,
- pageSize);
+ // // 获取排序后的member列表
+ // var members = descending
+ // ? await Instance.ZRevRangeByScoreAsync(
+ // redisCacheScoresIndexKey,
+ // maxScore,
+ // minScore,
+ // start,
+ // pageSize)
+ // : await Instance.ZRangeByScoreAsync(
+ // redisCacheScoresIndexKey,
+ // minScore,
+ // maxScore,
+ // start,
+ // pageSize);
- // 批量获取实际数据
- var dataTasks = members.Select(m =>
- Instance.HGetAsync(redisCacheKey, m)).ToArray();
- await Task.WhenAll(dataTasks);
+ // // 批量获取实际数据
+ // var dataTasks = members.Select(m =>
+ // Instance.HGetAsync(redisCacheKey, m)).ToArray();
+ // await Task.WhenAll(dataTasks);
- // 总数统计优化
- var total = await Instance.ZCountAsync(
- redisCacheScoresIndexKey,
- minScore,
- maxScore);
+ // // 总数统计优化
+ // var total = await Instance.ZCountAsync(
+ // redisCacheScoresIndexKey,
+ // minScore,
+ // maxScore);
- return new BusPagedResult
- {
- Items = dataTasks.Select(t => t.Result).ToList(),
- TotalCount = total,
- PageIndex = pageIndex,
- PageSize = pageSize
- };
- }
+ // return new BusPagedResult
+ // {
+ // Items = dataTasks.Select(t => t.Result).ToList(),
+ // TotalCount = total,
+ // PageIndex = pageIndex,
+ // PageSize = pageSize
+ // };
+ //}
- public async Task> GetFocusPagedData(
- string redisCacheKey,
- string redisCacheScoresIndexKey,
- int focusId,
- int pageSize = 10,
- long? lastScore = null,
- string lastMember = null,
- bool descending = true) where T : DeviceCacheBasicModel
- {
- // 计算分数范围
- long minScore = (long)focusId << 32;
- long maxScore = ((long)focusId + 1) << 32;
+ //public async Task> GetFocusPagedData(
+ //string redisCacheKey,
+ //string redisCacheScoresIndexKey,
+ //int focusId,
+ //int pageSize = 10,
+ //long? lastScore = null,
+ //string lastMember = null,
+ //bool descending = true) where T : DeviceCacheBasicModel
+ //{
+ // // 计算分数范围
+ // long minScore = (long)focusId << 32;
+ // long maxScore = ((long)focusId + 1) << 32;
- // 获取成员列表
- var members = await GetSortedMembers(
- redisCacheScoresIndexKey,
- minScore,
- maxScore,
- pageSize,
- lastScore,
- lastMember,
- descending);
+ // // 获取成员列表
+ // var members = await GetSortedMembers(
+ // redisCacheScoresIndexKey,
+ // minScore,
+ // maxScore,
+ // pageSize,
+ // lastScore,
+ // lastMember,
+ // descending);
- // 批量获取数据
- var dataDict = await Instance.HMGetAsync(redisCacheKey, members.CurrentItems);
+ // // 批量获取数据
+ // var dataDict = await Instance.HMGetAsync(redisCacheKey, members.CurrentItems);
- return new BusPagedResult
- {
- Items = dataDict,
- TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore),
- HasNext = members.HasNext,
- NextScore = members.NextScore,
- NextMember = members.NextMember
- };
- }
+ // return new BusPagedResult
+ // {
+ // Items = dataDict,
+ // TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore),
+ // HasNext = members.HasNext,
+ // NextScore = members.NextScore,
+ // NextMember = members.NextMember
+ // };
+ //}
- private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)>
- GetSortedMembers(
- string zsetKey,
- long minScore,
- long maxScore,
- int pageSize,
- long? lastScore,
- string lastMember,
- bool descending)
- {
- var querySize = pageSize + 1;
- var (startScore, exclude) = descending
- ? (lastScore ?? maxScore, lastMember)
- : (lastScore ?? minScore, lastMember);
+ //private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)>
+ // GetSortedMembers(
+ // string zsetKey,
+ // long minScore,
+ // long maxScore,
+ // int pageSize,
+ // long? lastScore,
+ // string lastMember,
+ // bool descending)
+ //{
+ // var querySize = pageSize + 1;
+ // var (startScore, exclude) = descending
+ // ? (lastScore ?? maxScore, lastMember)
+ // : (lastScore ?? minScore, lastMember);
- var members = descending
- ? await Instance.ZRevRangeByScoreAsync(
- zsetKey,
- max: startScore,
- min: minScore,
- offset: 0,
- count: querySize)
- : await Instance.ZRangeByScoreAsync(
- zsetKey,
- min: startScore,
- max: maxScore,
- offset: 0,
- count: querySize);
+ // var members = descending
+ // ? await Instance.ZRevRangeByScoreAsync(
+ // zsetKey,
+ // max: startScore,
+ // min: minScore,
+ // offset: 0,
+ // count: querySize)
+ // : await Instance.ZRangeByScoreAsync(
+ // zsetKey,
+ // min: startScore,
+ // max: maxScore,
+ // offset: 0,
+ // count: querySize);
- var hasNext = members.Length > pageSize;
- var currentItems = members.Take(pageSize).ToArray();
+ // var hasNext = members.Length > pageSize;
+ // var currentItems = members.Take(pageSize).ToArray();
- var nextCursor = currentItems.Any()
- ? await GetNextCursor(zsetKey, currentItems.Last(), descending)
- : (null, null);
+ // var nextCursor = currentItems.Any()
+ // ? await GetNextCursor(zsetKey, currentItems.Last(), descending)
+ // : (null, null);
- return (currentItems, hasNext, nextCursor.score, nextCursor.member);
- }
+ // return (currentItems, hasNext, nextCursor.score, nextCursor.member);
+ //}
- private async Task GetTotalCount(string zsetKey, long min, long max)
- {
- // 缓存计数优化
- var cacheKey = $"{zsetKey}_count_{min}_{max}";
- var cached = await Instance.GetAsync(cacheKey);
+ //private async Task GetTotalCount(string zsetKey, long min, long max)
+ //{
+ // // 缓存计数优化
+ // var cacheKey = $"{zsetKey}_count_{min}_{max}";
+ // var cached = await Instance.GetAsync(cacheKey);
- if (cached.HasValue)
- return cached.Value;
+ // if (cached.HasValue)
+ // return cached.Value;
- var count = await Instance.ZCountAsync(zsetKey, min, max);
- await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
- return count;
- }
+ // var count = await Instance.ZCountAsync(zsetKey, min, max);
+ // await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒
+ // return count;
+ //}
- public async Task>> BatchGetMeterPagedData(
- string redisCacheKey,
- string redisCacheScoresIndexKey,
- IEnumerable focusIds,
- int pageSizePerFocus = 10) where T : DeviceCacheBasicModel
- {
- var results = new ConcurrentDictionary>();
- var parallelOptions = new ParallelOptions
- {
- MaxDegreeOfParallelism = Environment.ProcessorCount * 2
- };
+ //public async Task>> BatchGetMeterPagedData(
+ //string redisCacheKey,
+ //string redisCacheScoresIndexKey,
+ //IEnumerable focusIds,
+ //int pageSizePerFocus = 10) where T : DeviceCacheBasicModel
+ //{
+ // var results = new ConcurrentDictionary>();
+ // var parallelOptions = new ParallelOptions
+ // {
+ // MaxDegreeOfParallelism = Environment.ProcessorCount * 2
+ // };
- await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) =>
- {
- var data = await SingleGetMeterPagedData(
- redisCacheKey,
- redisCacheScoresIndexKey,
- focusId,
- pageSizePerFocus);
+ // await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) =>
+ // {
+ // var data = await SingleGetMeterPagedData(
+ // redisCacheKey,
+ // redisCacheScoresIndexKey,
+ // focusId,
+ // pageSizePerFocus);
- results.TryAdd(focusId, data);
- });
+ // results.TryAdd(focusId, data);
+ // });
+
+ // return new Dictionary>(results);
+ //}
+
- return new Dictionary>(results);
- }
- ///
- /// 通过全局索引分页查询表计缓存数据
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 分页尺寸
- /// 最后一个索引
- /// 最后一个唯一标识
- /// 排序方式
- ///
- public async Task> GetGlobalPagedData(
- string redisCacheKey,
- string redisCacheGlobalIndexKey,
- int pageSize = 10,
- decimal? lastScore = null,
- string lastMember = null,
- bool descending = true)
- where T : DeviceCacheBasicModel
- {
- // 参数校验增强
- if (string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey))
- {
- throw new ArgumentException($"{nameof(GetGlobalPagedData)} 参数异常,-101");
- }
-
- if (pageSize < 1 || pageSize > 1000)
- {
- throw new ArgumentException($"{nameof(GetGlobalPagedData)} 分页大小应在1-1000之间,-102");
- }
-
- // 分页参数解析
- var (startScore, excludeMember) = descending
- ? (lastScore ?? decimal.MaxValue, lastMember)
- : (lastScore ?? 0, lastMember);
-
- // 游标分页查询
- var (members, hasNext) = await GetPagedMembers(
- redisCacheGlobalIndexKey,
- pageSize,
- startScore,
- excludeMember,
- descending);
-
- // 批量获取数据(优化内存分配)
- var dataDict = await BatchGetData(redisCacheKey, members);
-
- // 获取下一页游标
- var nextCursor = members.Any()
- ? await GetNextCursor(redisCacheGlobalIndexKey, members.Last(), descending)
- : (null, null);
-
- return new BusCacheGlobalPagedResult
- {
- Items = members.Select(m => dataDict.TryGetValue(m, out var v) ? v : default)
- .Where(x => x != null).ToList(),
- HasNext = hasNext,
- NextScore = nextCursor.score,
- NextMember = nextCursor.member
- };
- }
-
- ///
- /// 游标分页查询
- ///
- ///
- /// 分页数量
- /// 开始索引
- /// 开始唯一标识
- /// 排序方式
- ///
- private async Task<(List Members, bool HasNext)> GetPagedMembers(
- string redisCacheGlobalIndexKey,
- int pageSize,
- decimal? startScore,
- string excludeMember,
- bool descending)
- {
- const int bufferSize = 50; // 预读缓冲区大小
-
- // 使用流式分页(避免OFFSET性能问题)
- var members = new List(pageSize + 1);
- decimal? currentScore = startScore;
- string lastMember = excludeMember;
-
- while (members.Count < pageSize + 1 && currentScore.HasValue)
- {
- var querySize = Math.Min(bufferSize, pageSize + 1 - members.Count);
-
- var batch = descending
- ? await Instance.ZRevRangeByScoreAsync(
- redisCacheGlobalIndexKey,
- max: currentScore.Value,
- min: 0,
- offset: 0,
- count: querySize
- )
- : await Instance.ZRangeByScoreAsync(
- redisCacheGlobalIndexKey,
- min: currentScore.Value,
- max: long.MaxValue,
- offset: 0,
- count: querySize);
-
- if (!batch.Any()) break;
-
- members.AddRange(batch);
- lastMember = batch.LastOrDefault();
- currentScore = await Instance.ZScoreAsync(redisCacheGlobalIndexKey, lastMember);
- }
-
- return (
- members.Take(pageSize).ToList(),
- members.Count > pageSize
- );
- }
-
- ///
- /// 批量获取指定分页的数据
- ///
- ///
- ///
- ///
- ///
- private async Task> BatchGetData(
- string hashKey,
- IEnumerable members)
- where T : DeviceCacheBasicModel
- {
- const int batchSize = 100;
- var result = new Dictionary();
-
- foreach (var batch in members.Batch(batchSize))
- {
- var batchArray = batch.ToArray();
- var values = await Instance.HMGetAsync(hashKey, batchArray);
-
- for (int i = 0; i < batchArray.Length; i++)
- {
- if (EqualityComparer.Default.Equals(values[i], default)) continue;
- result[batchArray[i]] = values[i];
- }
- }
-
- return result;
- }
-
- ///
- /// 获取下一页游标
- ///
- /// 全局索引Key
- /// 最后一个唯一标识
- /// 排序方式
- ///
- private async Task<(decimal? score, string member)> GetNextCursor(
- string redisCacheGlobalIndexKey,
- string lastMember,
- bool descending)
- {
- if (string.IsNullOrWhiteSpace(lastMember))
- {
- return (null, null);
- }
-
- var score = await Instance.ZScoreAsync(redisCacheGlobalIndexKey, lastMember);
- return score.HasValue
- ? (Convert.ToInt64(score.Value), lastMember)
- : (null, null);
- }
}
}
\ No newline at end of file
diff --git a/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs
index dc0aaa3..8fc6861 100644
--- a/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs
+++ b/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs
@@ -9,105 +9,7 @@ namespace JiShe.CollectBus.FreeRedisProvider
/// 获取客户端
///
///
- RedisClient Instance { get; set; }
-
- ///
- /// 单个添加数据
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 集中器索引Set缓存Key
- /// 集中器排序索引ZSET缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 表计信息
- /// 可选时间戳
- ///
- Task AddMeterCacheData(
- string redisCacheKey,
- string redisCacheFocusIndexKey,
- string redisCacheScoresIndexKey,
- string redisCacheGlobalIndexKey,
- T data,
- DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel;
-
- ///
- /// 批量添加数据
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 集中器索引Set缓存Key
- /// 集中器排序索引ZSET缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 数据集合
- /// 可选时间戳
- ///
- Task BatchAddMeterData(
- string redisCacheKey,
- string redisCacheFocusIndexKey,
- string redisCacheScoresIndexKey,
- string redisCacheGlobalIndexKey,
- IEnumerable items,
- DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel;
-
- ///
- /// 删除指定redis缓存key的缓存数据
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 集中器索引Set缓存Key
- /// 集中器排序索引ZSET缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 表计信息
- ///
- Task RemoveMeterData(
- string redisCacheKey,
- string redisCacheFocusIndexKey,
- string redisCacheScoresIndexKey,
- string redisCacheGlobalIndexKey,
- T data) where T : DeviceCacheBasicModel;
-
- ///
- /// 修改表计缓存信息
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 旧集中器索引Set缓存Key
- /// 新集中器索引Set缓存Key
- /// 集中器排序索引ZSET缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 表计信息
- /// 可选时间戳
- ///
- Task UpdateMeterData(
- string redisCacheKey,
- string oldRedisCacheFocusIndexKey,
- string newRedisCacheFocusIndexKey,
- string redisCacheScoresIndexKey,
- string redisCacheGlobalIndexKey,
- T newData,
- DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel;
-
-
-
- ///
- /// 通过全局索引分页查询表计缓存数据
- ///
- ///
- /// 主数据存储Hash缓存Key
- /// 集中器采集频率分组全局索引ZSet缓存Key
- /// 分页尺寸
- /// 最后一个索引
- /// 最后一个唯一标识
- /// 排序方式
- ///
- Task> GetGlobalPagedData(
- string redisCacheKey,
- string redisCacheGlobalIndexKey,
- int pageSize = 10,
- decimal? lastScore = null,
- string lastMember = null,
- bool descending = true)
- where T : DeviceCacheBasicModel;
+ RedisClient Instance { get; set; }
}
}
diff --git a/src/JiShe.CollectBus.FreeRedisProvider/Options/FreeRedisOptions.cs b/src/JiShe.CollectBus.FreeRedisProvider/Options/FreeRedisOptions.cs
index 75d92a2..2ed7e49 100644
--- a/src/JiShe.CollectBus.FreeRedisProvider/Options/FreeRedisOptions.cs
+++ b/src/JiShe.CollectBus.FreeRedisProvider/Options/FreeRedisOptions.cs
@@ -13,6 +13,11 @@ namespace JiShe.CollectBus.FreeRedisProvider.Options
///
public string? Configuration { get; set; }
+ ///
+ /// 最大连接数
+ ///
+ public string? MaxPoolSize { get; set; }
+
///
/// 默认数据库
///
diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json
index c922f8d..bb09204 100644
--- a/src/JiShe.CollectBus.Host/appsettings.json
+++ b/src/JiShe.CollectBus.Host/appsettings.json
@@ -41,6 +41,7 @@
},
"Redis": {
"Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
+ "MaxPoolSize": "50",
"DefaultDB": "14",
"HangfireDB": "15"
},
@@ -128,7 +129,7 @@
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
},
- "ServerTagName": "JiSheCollectBus2",
+ "ServerTagName": "JiSheCollectBus3",
"KafkaReplicationFactor": 3,
"NumPartitions": 30
}
\ No newline at end of file