diff --git a/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs new file mode 100644 index 0000000..5f0e8f8 --- /dev/null +++ b/src/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs @@ -0,0 +1,187 @@ +using JiShe.CollectBus.Common.Models; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Application.Contracts +{ + /// + /// 数据缓存服务接口 + /// + public interface IRedisDataCacheService + { + /// + /// 单个添加数据 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// ZSET索引缓存Key + /// 待缓存数据 + /// + Task InsertDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string redisZSetScoresIndexCacheKey, + T data) where T : DeviceCacheBasicModel; + + /// + /// 批量添加数据 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// ZSET索引缓存Key + /// 待缓存数据集合 + /// + Task BatchInsertDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string redisZSetScoresIndexCacheKey, + IEnumerable items) where T : DeviceCacheBasicModel; + + /// + /// 删除缓存信息 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// ZSET索引缓存Key + /// 已缓存数据 + /// + Task RemoveCacheDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string redisZSetScoresIndexCacheKey, + T data) where T : DeviceCacheBasicModel; + + /// + /// 修改缓存信息,映射关系未发生改变 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// ZSET索引缓存Key + /// 待修改缓存数据 + /// + Task ModifyDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string redisZSetScoresIndexCacheKey, + T newData) where T : DeviceCacheBasicModel; + + + /// + /// 修改缓存信息,映射关系已改变 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// 旧的映射关系 + /// ZSET索引缓存Key + /// 待修改缓存数据 + /// + Task ModifyDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string oldMemberId, + string redisZSetScoresIndexCacheKey, + T newData) where T : DeviceCacheBasicModel; + + ///// + ///// 通过集中器与表计信息排序索引获取数据 + ///// + ///// + ///// 主数据存储Hash缓存Key + ///// ZSET索引缓存Key + ///// 分页尺寸 + ///// 最后一个索引 + ///// 最后一个唯一标识 + ///// 排序方式 + ///// + //Task> GetPagedData( + //string redisHashCacheKey, + //string redisZSetScoresIndexCacheKey, + //IEnumerable focusIds, + //int pageSize = 10, + //decimal? lastScore = null, + //string lastMember = null, + //bool descending = true) + //where T : DeviceCacheBasicModel; + + + /// + /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 + /// + /// + /// 主数据存储Hash缓存Key + /// ZSET索引缓存Key + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + Task> GetAllPagedData( + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + int pageSize = 1000, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel; + + + /// + /// 优化后的分页获取方法(支持百万级数据) + /// + Task> GetAllPagedDataOptimized( + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + int pageSize = 1000, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) where T : DeviceCacheBasicModel; + + ///// + ///// 游标分页查询 + ///// + ///// 排序索引ZSET缓存Key + ///// 分页数量 + ///// 开始索引 + ///// 开始唯一标识 + ///// 排序方式 + ///// + //Task<(List Members, bool HasNext)> GetPagedMembers( + // string redisZSetScoresIndexCacheKey, + // int pageSize, + // decimal? startScore, + // string excludeMember, + // bool descending); + + ///// + ///// 批量获取指定分页的数据 + ///// + ///// + ///// Hash表缓存key + ///// Hash表字段集合 + ///// + //Task> BatchGetData( + // string redisHashCacheKey, + // IEnumerable members) + // where T : DeviceCacheBasicModel; + + ///// + ///// 获取下一页游标 + ///// + ///// 排序索引ZSET缓存Key + ///// 最后一个唯一标识 + ///// 排序方式 + ///// + //Task GetNextScore( + // string redisZSetScoresIndexCacheKey, + // string lastMember, + // bool descending); + } +} diff --git a/src/JiShe.CollectBus.Application/CollectBusAppService.cs b/src/JiShe.CollectBus.Application/CollectBusAppService.cs index 6ecc509..ad5b585 100644 --- a/src/JiShe.CollectBus.Application/CollectBusAppService.cs +++ b/src/JiShe.CollectBus.Application/CollectBusAppService.cs @@ -79,7 +79,7 @@ public abstract class CollectBusAppService : ApplicationService { string key = (string)item[0]; object[] fieldsAndValues = (object[])item[1]; - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}"; + var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, systemType, serverTagName, meterType, timeDensity)}"; string focusAddress = key.Replace(redisCacheKey, ""); var meterHashs = new Dictionary(); @@ -182,7 +182,7 @@ public abstract class CollectBusAppService : ApplicationService string key = (string)item[0]; object[] fieldsAndValues = (object[])item[1]; var redisCacheKey = string.Format( - RedisConst.CacheMeterInfoKey, + RedisConst.CacheMeterInfoHashKey, systemType, serverTagName, meterType, diff --git a/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs new file mode 100644 index 0000000..3c96410 --- /dev/null +++ b/src/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -0,0 +1,1000 @@ +using Confluent.Kafka; +using FreeRedis; +using JiShe.CollectBus.Application.Contracts; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.Common.Models; +using JiShe.CollectBus.FreeRedisProvider; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Volo.Abp.DependencyInjection; +using static FreeSql.Internal.GlobalFilter; +using static System.Runtime.InteropServices.JavaScript.JSType; +using static Volo.Abp.UI.Navigation.DefaultMenuNames.Application; + +namespace JiShe.CollectBus.RedisDataCache +{ + /// + /// 数据缓存服务接口 + /// + public class RedisDataCacheService : IRedisDataCacheService, ITransientDependency + { + private readonly IFreeRedisProvider _freeRedisProvider; + private readonly ILogger _logger; + private RedisClient Instance { get; set; } + + /// + /// 数据缓存服务接口 + /// + /// + /// + public RedisDataCacheService(IFreeRedisProvider freeRedisProvider, + ILogger logger) + { + this._freeRedisProvider = freeRedisProvider; + this._logger = logger; + + Instance = _freeRedisProvider.Instance; + } + + /// + /// 单个添加数据 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// ZSET索引缓存Key + /// 待缓存数据 + /// + public async Task InsertDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string redisZSetScoresIndexCacheKey, + T data) where T : DeviceCacheBasicModel + { + // 参数校验增强 + if (data == null || string.IsNullOrWhiteSpace(redisHashCacheKey) + || string.IsNullOrWhiteSpace(redisSetIndexCacheKey) + || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + { + _logger.LogError($"{nameof(InsertDataAsync)} 参数异常,-101"); + return; + } + + // 使用事务保证原子性 + using (var trans = Instance.Multi()) + { + // 主数据存储Hash + trans.HSet(redisHashCacheKey, data.MemberId, data.Serialize()); + + // 集中器号分组索引Set缓存 + trans.SAdd(redisSetIndexCacheKey, data.MemberId); + + // 集中器与表计信息排序索引ZSET缓存Key + trans.ZAdd(redisZSetScoresIndexCacheKey, data.ScoreValue, data.MemberId); + + var results = trans.Exec(); + + if (results == null || results.Length <= 0) + { + _logger.LogError($"{nameof(InsertDataAsync)} 添加事务提交失败,-102"); + } + } + + await Task.CompletedTask; + } + + /// + /// 批量添加数据 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// ZSET索引缓存Key + /// 待缓存数据集合 + /// + public async Task BatchInsertDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string redisZSetScoresIndexCacheKey, + IEnumerable items) where T : DeviceCacheBasicModel + { + if (items == null + || items.Count() <= 0 + || string.IsNullOrWhiteSpace(redisHashCacheKey) + || string.IsNullOrWhiteSpace(redisSetIndexCacheKey) + || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + { + _logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101"); + return; + } + + const int BATCH_SIZE = 1000; // 每批1000条 + var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); + + foreach (var batch in items.Batch(BATCH_SIZE)) + { + await semaphore.WaitAsync(); + + _ = Task.Run(() => + { + using (var pipe = Instance.StartPipe()) + { + foreach (var item in batch) + { + // 主数据存储Hash + pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize()); + + // Set索引缓存 + pipe.SAdd(redisSetIndexCacheKey, item.MemberId); + + // ZSET索引缓存Key + pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId); + } + pipe.EndPipe(); + } + semaphore.Release(); + }); + } + + await Task.CompletedTask; + } + + /// + /// 删除缓存信息 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// ZSET索引缓存Key + /// 已缓存数据 + /// + public async Task RemoveCacheDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string redisZSetScoresIndexCacheKey, + T data) where T : DeviceCacheBasicModel + { + if (data == null + || string.IsNullOrWhiteSpace(redisHashCacheKey) + || string.IsNullOrWhiteSpace(redisSetIndexCacheKey) + || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + { + _logger.LogError($"{nameof(RemoveCacheDataAsync)} 参数异常,-101"); + return; + } + + const string luaScript = @" + local hashCacheKey = KEYS[1] + local setIndexCacheKey = KEYS[2] + local zsetScoresIndexCacheKey = KEYS[3] + local member = ARGV[1] + + local deleted = 0 + if redis.call('HDEL', hashCacheKey, member) > 0 then + deleted = 1 + end + + redis.call('SREM', setIndexCacheKey, member) + redis.call('ZREM', zsetScoresIndexCacheKey, member) + return deleted + "; + + var keys = new[] + { + redisHashCacheKey, + redisSetIndexCacheKey, + redisZSetScoresIndexCacheKey + }; + + var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberId }); + + if ((int)result == 0) + { + _logger.LogError($"{nameof(RemoveCacheDataAsync)} 删除指定Key{redisHashCacheKey}的{data.MemberId}数据失败,-102"); + } + } + + /// + /// 修改缓存信息,映射关系未发生改变 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// ZSET索引缓存Key + /// 待修改缓存数据 + /// + public async Task ModifyDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string redisZSetScoresIndexCacheKey, + T newData) where T : DeviceCacheBasicModel + { + if (newData == null + || string.IsNullOrWhiteSpace(redisHashCacheKey) + || string.IsNullOrWhiteSpace(redisSetIndexCacheKey) + || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + { + _logger.LogError($"{nameof(ModifyDataAsync)} 参数异常,-101"); + return; + } + + var luaScript = @" + local hashCacheKey = KEYS[1] + local member = ARGV[1] + local newData = ARGV[2] + + -- 校验存在性 + if redis.call('HEXISTS', hashCacheKey, member) == 0 then + return 0 + end + + -- 更新主数据 + redis.call('HSET', hashCacheKey, member, newData) + + return 1 + "; + + + var result = await Instance.EvalAsync(luaScript, + new[] + { + redisHashCacheKey + }, + new object[] + { + newData.MemberId, + newData.Serialize() + }); + + if ((int)result == 0) + { + _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102"); + } + } + + /// + /// 修改缓存信息,映射关系已经改变 + /// + /// + /// 主数据存储Hash缓存Key + /// Set索引缓存Key + /// 旧的映射关系 + /// ZSET索引缓存Key + /// 待修改缓存数据 + /// + public async Task ModifyDataAsync( + string redisHashCacheKey, + string redisSetIndexCacheKey, + string oldMemberId, + string redisZSetScoresIndexCacheKey, + T newData) where T : DeviceCacheBasicModel + { + if (newData == null + || string.IsNullOrWhiteSpace(redisHashCacheKey) + || string.IsNullOrWhiteSpace(redisSetIndexCacheKey) + || string.IsNullOrWhiteSpace(oldMemberId) + || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + { + _logger.LogError($"{nameof(ModifyDataAsync)} 参数异常,-101"); + return; + } + + var luaScript = @" + local hashCacheKey = KEYS[1] + local setIndexCacheKey = KEYS[2] + local zsetScoresIndexCacheKey = KEYS[3] + local member = ARGV[1] + local oldMember = ARGV[2] + local newData = ARGV[3] + local newScore = ARGV[4] + + -- 校验存在性 + if redis.call('HEXISTS', hashCacheKey, oldMember) == 0 then + return 0 + end + + -- 删除旧数据 + redis.call('HDEL', hashCacheKey, oldMember) + + -- 插入新主数据 + redis.call('HSET', hashCacheKey, member, newData) + + -- 处理变更 + if newScore ~= '' then + -- 删除旧索引 + redis.call('SREM', setIndexCacheKey, oldMember) + redis.call('ZREM', zsetScoresIndexCacheKey, oldMember) + + -- 添加新索引 + redis.call('SADD', setIndexCacheKey, member) + redis.call('ZADD', zsetScoresIndexCacheKey, newScore, member) + end + + return 1 + "; + + var result = await Instance.EvalAsync(luaScript, + new[] + { + redisHashCacheKey, + redisSetIndexCacheKey, + redisZSetScoresIndexCacheKey + }, + new object[] + { + newData.MemberId, + oldMemberId, + newData.Serialize(), + newData.ScoreValue.ToString() ?? "", + }); + + if ((int)result == 0) + { + _logger.LogError($"{nameof(ModifyDataAsync)} 更新指定Key{redisHashCacheKey}的{newData.MemberId}数据失败,-102"); + } + } + + + /// + /// 通过集中器与表计信息排序索引获取指定集中器号集合数据 + /// + /// + /// 主数据存储Hash缓存Key + /// 集中器与表计信息排序索引ZSET缓存Key + /// 集中器Id + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + public async Task> GetPagedData( + string redisCacheKey, + string redisCacheFocusScoresIndexKey, + IEnumerable focusIds, + int pageSize = 10, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel + { + throw new Exception(); + } + + + /// + /// 优化后的分页获取方法(支持百万级数据) + /// + public async Task> GetAllPagedDataOptimized( + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + int pageSize = 1000, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) where T : DeviceCacheBasicModel + { + // 参数校验 + if (string.IsNullOrWhiteSpace(redisHashCacheKey) || + string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + { + _logger.LogError("Invalid parameters in {Method}", nameof(GetAllPagedDataOptimized)); + return new BusCacheGlobalPagedResult { Items = new List() }; + } + + pageSize = Math.Clamp(pageSize, 1, 10000); + + const string luaScript = @" + local command = ARGV[1] + local range_start = ARGV[2] + local range_end = ARGV[3] + local limit = tonumber(ARGV[4]) + local last_score = ARGV[5] + local last_member = ARGV[6] + + -- 获取扩展数据(3倍分页大小) + local members + if command == 'ZRANGEBYSCORE' then + members = redis.call('ZRANGEBYSCORE', KEYS[1], range_start, range_end, + 'WITHSCORES', 'LIMIT', 0, limit * 5) + else + members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, + 'WITHSCORES', 'LIMIT', 0, limit * 5) + end + + -- 精确分页过滤 + local filtered = {} + local count = 0 + local start_index = 1 + + -- 存在锚点时寻找起始位置 + if last_score ~= '' and last_member ~= '' then + for i=1,#members,2 do + local score = members[i+1] + local member = members[i] + + if command == 'ZRANGEBYSCORE' then + if tonumber(score) > tonumber(last_score) then + start_index = i + break + elseif tonumber(score) == tonumber(last_score) then + if member > last_member then + start_index = i + break + end + end + else + if tonumber(score) < tonumber(last_score) then + start_index = i + break + elseif tonumber(score) == tonumber(last_score) then + if member < last_member then + start_index = i + break + end + end + end + end + end + + -- 收集有效数据 + for i=start_index,#members,2 do + if count >= limit then break end + table.insert(filtered, members[i]) + table.insert(filtered, members[i+1]) + count = count + 1 + end + + -- 提取有效数据 + local result_members = {} + local result_scores = {} + for i=1,#filtered,2 do + table.insert(result_members, filtered[i]) + table.insert(result_scores, filtered[i+1]) + end + + if #result_members == 0 then return {0,{},{},{}} end + + -- 获取Hash数据 + local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) + return {#result_members, result_members, result_scores, hash_data}"; + + // 构造查询范围(包含等于) + string rangeStart, rangeEnd; + if (descending) + { + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf"; + rangeEnd = "-inf"; + } + else + { + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf"; + rangeEnd = "+inf"; + } + + try + { + var scriptResult = (object[])await Instance.EvalAsync( + luaScript, + new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, + new object[] + { + descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", + rangeStart, + rangeEnd, + pageSize, + lastScore?.ToString() ?? "", + lastMember ?? "" + }); + + var itemCount = (long)scriptResult[0]; + if (itemCount == 0) + return new BusCacheGlobalPagedResult { Items = new List() }; + + // 处理结果集 + var members = ((object[])scriptResult[1]).Cast().ToList(); + var scores = ((object[])scriptResult[2]).Cast() + .Select(decimal.Parse).ToList(); + var hashData = ((object[])scriptResult[3]).Cast().ToList(); + + var validItems = members.AsParallel() + .Select((m, i) => + { + try { return BusJsonSerializer.Deserialize(hashData[i]); } + catch { return null; } + }) + .Where(x => x != null) + .ToList(); + + // 精确分页控制 + var hasNext = validItems.Count >= pageSize; + var actualItems = validItems.Take(pageSize).ToList(); + + // 计算下一页锚点(必须基于原始排序) + decimal? nextScore = null; + string nextMember = null; + if (hasNext && actualItems.Count > 0) + { + var lastValidIndex = Math.Min(pageSize - 1, members.Count - 1); + nextScore = scores[lastValidIndex]; + nextMember = members[lastValidIndex]; + } + + return new BusCacheGlobalPagedResult + { + Items = actualItems, + HasNext = hasNext, + NextScore = nextScore, + NextMember = nextMember, + TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey), + PageSize = pageSize + }; + } + catch (Exception ex) + { + _logger.LogError(ex, "分页查询异常"); + return new BusCacheGlobalPagedResult { Items = new List() }; + } + } + + /// + /// 并行分页导出方法(百万级数据支持) + /// + public async Task> FullExportParallel( + string hashKey, + string zsetKey, + int parallelDegree = 10, + int pageSize = 5000) where T : DeviceCacheBasicModel + { + var result = new ConcurrentBag(); + var totalCount = await GetTotalCount(zsetKey); + var totalPages = (int)Math.Ceiling(totalCount / (double)pageSize); + + var semaphore = new SemaphoreSlim(parallelDegree); + var tasks = new List(); + + decimal? lastScore = null; + string lastMember = null; + var isDescending = true; + + for (int page = 0; page < totalPages; page++) + { + await semaphore.WaitAsync(); + + tasks.Add(Task.Run(async () => + { + try + { + var pageResult = await GetAllPagedData( + hashKey, + zsetKey, + pageSize, + lastScore, + lastMember, + isDescending); + + foreach (var item in pageResult.Items) + { + result.Add(item); + } + + // 更新分页锚点 + if (pageResult.HasNext) + { + lastScore = pageResult.NextScore; + lastMember = pageResult.NextMember; + } + } + finally + { + semaphore.Release(); + } + })); + } + + await Task.WhenAll(tasks); + return result.ToList(); + } + + + /// + /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 + /// + /// + /// 主数据存储Hash缓存Key + /// ZSET索引缓存Key + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + public async Task> GetAllPagedData( + string redisHashCacheKey, + string redisZSetScoresIndexCacheKey, + int pageSize = 1000, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel + { + // 参数校验增强 + if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + { + _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101"); + return null; + } + + if (pageSize < 1 || pageSize > 10000) + { + _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102"); + return null; + } + + var luaScript = @" + local command = ARGV[1] + local range_start = ARGV[2] + local range_end = ARGV[3] + local limit = tonumber(ARGV[4]) + local last_score = ARGV[5] + local last_member = ARGV[6] + + -- 获取原始数据 + local members + if command == 'ZRANGEBYSCORE' then + members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2) + else + members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2) + end + + -- 过滤数据 + local filtered_members = {} + local count = 0 + for i = 1, #members, 2 do + local member = members[i] + local score = members[i+1] + local include = true + if last_score ~= '' and last_member ~= '' then + if command == 'ZRANGEBYSCORE' then + -- 升序:score > last_score 或 (score == last_score 且 member > last_member) + if score == last_score then + include = member > last_member + else + include = tonumber(score) > tonumber(last_score) + end + else + -- 降序:score < last_score 或 (score == last_score 且 member < last_member) + if score == last_score then + include = member < last_member + else + include = tonumber(score) < tonumber(last_score) + end + end + end + if include then + table.insert(filtered_members, member) + table.insert(filtered_members, score) + count = count + 1 + if count >= limit then + break + end + end + end + + -- 提取有效数据 + local result_members, result_scores = {}, {} + for i=1,#filtered_members,2 do + table.insert(result_members, filtered_members[i]) + table.insert(result_scores, filtered_members[i+1]) + end + + if #result_members == 0 then return {0,{},{},{}} end + + -- 获取Hash数据 + local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) + return {#result_members, result_members, result_scores, hash_data}"; + + // 调整范围构造逻辑(移除排他符号) + string rangeStart, rangeEnd; + if (descending) + { + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf"; + rangeEnd = "-inf"; + } + else + { + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf"; + rangeEnd = "+inf"; + } + + var scriptResult = (object[])await Instance.EvalAsync(luaScript, + new[] { redisZSetScoresIndexCacheKey, redisHashCacheKey }, + new object[] + { + descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", + rangeStart, + rangeEnd, + (pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页 + lastScore?.ToString() ?? "", + lastMember ?? "" + }); + + if ((long)scriptResult[0] == 0) + return new BusCacheGlobalPagedResult { Items = new List() }; + + // 处理结果集 + var members = ((object[])scriptResult[1]).Cast().ToList(); + var scores = ((object[])scriptResult[2]).Cast().Select(decimal.Parse).ToList(); + var hashData = ((object[])scriptResult[3]).Cast().ToList(); + + var validItems = members.AsParallel() + .Select((m, i) => + { + try { return BusJsonSerializer.Deserialize(hashData[i]); } + catch { return null; } + }) + .Where(x => x != null) + .ToList(); + + var hasNext = validItems.Count > pageSize; + var actualItems = hasNext ? validItems.Take(pageSize) : validItems; + + //分页锚点索引 + decimal? nextScore = null; + string nextMember = null; + if (hasNext && actualItems.Any()) + { + var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引 + nextScore = scores[lastIndex]; + nextMember = members[lastIndex]; + } + + return new BusCacheGlobalPagedResult + { + Items = actualItems.ToList(), + HasNext = hasNext, + NextScore = nextScore, + NextMember = nextMember, + TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey), + PageSize = pageSize, + }; + } + + ///// + ///// 通过集中器与表计信息排序索引获取数据 + ///// + ///// + ///// 主数据存储Hash缓存Key + ///// ZSET索引缓存Key + ///// 分页尺寸 + ///// 最后一个索引 + ///// 最后一个唯一标识 + ///// 排序方式 + ///// + //public async Task> GetAllPagedData( + //string redisHashCacheKey, + //string redisZSetScoresIndexCacheKey, + //int pageSize = 1000, + //decimal? lastScore = null, + //string lastMember = null, + //bool descending = true) + //where T : DeviceCacheBasicModel + //{ + // // 参数校验增强 + // if (string.IsNullOrWhiteSpace(redisHashCacheKey) || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey)) + // { + // _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101"); + // return null; + // } + + // if (pageSize < 1 || pageSize > 10000) + // { + // _logger.LogError($"{nameof(GetAllPagedData)} 分页大小应在1-10000之间,-102"); + // return null; + // } + + // //// 分页参数解析 + // //var (startScore, excludeMember) = descending + // // ? (lastScore ?? decimal.MaxValue, lastMember) + // // : (lastScore ?? 0, lastMember); + + // //执行分页查询(整合游标处理) + // var pageResult = await GetPagedMembers( + // redisZSetScoresIndexCacheKey, + // pageSize, + // lastScore, + // lastMember, + // descending); + + // // 批量获取数据(优化内存分配) + // var dataDict = await BatchGetData(redisHashCacheKey, pageResult.Members); + + // return new BusCacheGlobalPagedResult + // { + // Items = pageResult.Members.Select(m => dataDict.TryGetValue(m, out var v) ? v : default) + // .Where(x => x != null).ToList(), + // HasNext = pageResult.HasNext, + // NextScore = pageResult.NextScore, + // NextMember = pageResult.NextMember, + // TotalCount = await GetTotalCount(redisZSetScoresIndexCacheKey), + // PageSize = pageSize, + // }; + //} + + /// + /// 游标分页查询 + /// + /// 排序索引ZSET缓存Key + /// 分页数量 + /// 上一个索引 + /// 上一个标识 + /// 排序方式 + /// + private async Task<(List Members, bool HasNext, decimal? NextScore, string NextMember)> GetPagedMembers( + string redisZSetScoresIndexCacheKey, + int pageSize, + decimal? lastScore, + string lastMember, + bool descending) + { + // 根据排序方向初始化参数 + long initialScore = descending ? long.MaxValue : 0; + decimal? currentScore = lastScore ?? initialScore; + string currentMember = lastMember; + var members = new List(pageSize + 1); + + // 使用游标分页查询 + while (members.Count < pageSize + 1 && currentScore.HasValue) + { + var (batch, hasMore) = await GetNextBatch( + redisZSetScoresIndexCacheKey, + pageSize + 1 - members.Count, + currentScore.Value, + currentMember, + descending); + + if (!batch.Any()) break; + + members.AddRange(batch); + + // 更新游标 + currentMember = batch.LastOrDefault(); + currentScore = await GetNextScore(redisZSetScoresIndexCacheKey, currentMember, descending); + } + + // 处理分页结果 + bool hasNext = members.Count > pageSize; + var resultMembers = members.Take(pageSize).ToList(); + + return ( + resultMembers, + hasNext, + currentScore, + currentMember + ); + } + + /// + /// 批量获取指定分页的数据 + /// + /// + /// Hash表缓存key + /// Hash表字段集合 + /// + private async Task> BatchGetData( + string redisHashCacheKey, + IEnumerable members) + where T : DeviceCacheBasicModel + { + using var pipe = Instance.StartPipe(); + + foreach (var member in members) + { + pipe.HGet(redisHashCacheKey, member); + } + + var results = pipe.EndPipe(); + return await Task.FromResult(members.Zip(results, (k, v) => new { k, v }) + .ToDictionary(x => x.k, x => (T)x.v)); + } + + /// + /// 处理下一个分页数据 + /// + /// + /// + /// + /// + /// + /// + private async Task<(string[] Batch, bool HasMore)> GetNextBatch( + string zsetKey, + int limit, + decimal score, + string excludeMember, + bool descending) + { + var query = descending + ? await Instance.ZRevRangeByScoreAsync( + zsetKey, + max: score, + min: 0, + offset: 0, + count: limit) + : await Instance.ZRangeByScoreAsync( + zsetKey, + min: score, + max: long.MaxValue, + offset: 0, + count: limit); + + return (query, query.Length >= limit); + } + + /// + /// 获取下一页游标 + /// + /// 排序索引ZSET缓存Key + /// 最后一个唯一标识 + /// 排序方式 + /// + private async Task GetNextScore( + string redisZSetScoresIndexCacheKey, + string lastMember, + bool descending) + { + if (string.IsNullOrEmpty(lastMember)) return null; + + var score = await Instance.ZScoreAsync(redisZSetScoresIndexCacheKey, lastMember); + if (!score.HasValue) return null; + + // 根据排序方向调整score + return descending + ? score.Value - 1 // 降序时下页查询小于当前score + : score.Value + 1; // 升序时下页查询大于当前score + } + + /// + /// 获取指定ZSET区间内的总数量 + /// + /// + /// + /// + /// + public async Task GetCount(string zsetKey, long min, long max) + { + // 缓存计数优化 + var cacheKey = $"{zsetKey}_count_{min}_{max}"; + var cached = await Instance.GetAsync(cacheKey); + + if (cached.HasValue) + return cached.Value; + + var count = await Instance.ZCountAsync(zsetKey, min, max); + await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒 + return count; + } + + /// + /// 获取指定ZSET的总数量 + /// + /// + /// + private async Task GetTotalCount(string redisZSetScoresIndexCacheKey) + { + // 缓存计数优化 + var cacheKey = $"{redisZSetScoresIndexCacheKey}_total_count"; + var cached = await Instance.GetAsync(cacheKey); + + if (cached.HasValue) + return cached.Value; + + var count = await Instance.ZCountAsync(redisZSetScoresIndexCacheKey, 0, decimal.MaxValue); + await Instance.SetExAsync(cacheKey, 30, count); // 缓存30秒 + return count; + } + } +} diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index d4c9a59..15b094a 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -23,6 +23,9 @@ using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Kafka.Attributes; using System.Text.Json; using JiShe.CollectBus.Kafka; +using JiShe.CollectBus.Application.Contracts; +using JiShe.CollectBus.Common.Models; +using System.Diagnostics; namespace JiShe.CollectBus.Samples; @@ -32,17 +35,23 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS private readonly IIoTDBProvider _iotDBProvider; private readonly IoTDBRuntimeContext _dbContext; private readonly IoTDBOptions _options; + private readonly IRedisDataCacheService _redisDataCacheService; public SampleAppService(IIoTDBProvider iotDBProvider, IOptions options, - IoTDBRuntimeContext dbContext, ILogger logger) + IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) { _iotDBProvider = iotDBProvider; _options = options.Value; _dbContext = dbContext; _logger = logger; + _redisDataCacheService = redisDataCacheService; } - + /// + /// 测试 UseSessionPool + /// + /// + /// [HttpGet] public async Task UseSessionPool(long timestamps) { @@ -72,7 +81,10 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS await _iotDBProvider.InsertAsync(meter); } - + /// + /// 测试Session切换 + /// + /// [HttpGet] public async Task UseTableSessionPool() { @@ -125,7 +137,7 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS var timeDensity = "15"; //获取缓存中的电表信息 - var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*"; + var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*"; var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter); @@ -178,6 +190,44 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS await _iotDBProvider.InsertAsync(meter); } + /// + /// 测试Redis批量读取10万条数据性能 + /// + /// + [HttpGet] + public async Task TestRedisCacheGetAllPagedData() + { + var timeDensity = "15"; + string SystemType = "Energy"; + string ServerTagName = "JiSheCollectBus2"; + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + + var timer1 = Stopwatch.StartNew(); + decimal? cursor = null; + string member = null; + bool hasNext; + List meterInfos = new List(); + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + } while (hasNext); + + timer1.Stop(); + _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); + } + public Task GetAsync() { diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index d830707..eac2a92 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,11 +1,13 @@ using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; +using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; +using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IotSystems.MessageIssueds; @@ -13,13 +15,16 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; +using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.Repository.MeterReadingRecord; +using Mapster; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -32,6 +37,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IIoTDBProvider _dbProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IProducerService _producerService; + private readonly IRedisDataCacheService _redisDataCacheService; private readonly ICapPublisher _producerBus; public BasicScheduledMeterReadingService( @@ -39,6 +45,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading ICapPublisher producerBus, IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, + IRedisDataCacheService redisDataCacheService, IIoTDBProvider dbProvider) { _producerBus = producerBus; @@ -46,6 +53,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _dbProvider = dbProvider; _meterReadingRecordRepository = meterReadingRecordRepository; _producerService = producerService; + _redisDataCacheService = redisDataCacheService; } /// @@ -115,32 +123,45 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); continue; } - - - - //获取缓存中的表信息 - var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, meteryType, timeDensity)}*"; - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-104"); - return; - } - + var meterTypes = EnumExtensions.ToEnumDictionary(); if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - // 解析结果(结果为嵌套数组) - var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); + var timer = Stopwatch.StartNew(); + + //获取对应频率中的所有电表信息 + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + + List meterInfos = new List(); + decimal? cursor = null; + string member = null; + bool hasNext; + do + { + var page = await _redisDataCacheService.GetAllPagedData( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: cursor, + lastMember: member); + + meterInfos.AddRange(page.Items); + cursor = page.HasNext ? page.NextScore : null; + member = page.HasNext ? page.NextMember : null; + hasNext = page.HasNext; + } while (hasNext); + if (meterInfos == null || meterInfos.Count <= 0) { + timer.Stop(); _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); return; } //await AmmerterScheduledMeterReadingIssued(timeDensity, meterInfos); - var timer = Stopwatch.StartNew(); //处理数据 //await DeviceGroupBalanceControl.ProcessGenericListAsync( @@ -157,14 +178,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading await DeviceGroupBalanceControl.ProcessWithThrottleAsync( items: meterInfos, deviceIdSelector: data => data.FocusAddress, - processor: data => + processor: (data,groupIndex) => { - _ = AmmerterCreatePublishTask(timeDensity, data); + _ = AmmerterCreatePublishTask(timeDensity, data, groupIndex,tasksToBeIssueModel.NextTaskTime.ToString("yyyyMMddHHmmss")); } ); timer.Stop(); - _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},{oneMinutekeyList.Length}"); + _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成,{timer.ElapsedMilliseconds},总共{meterInfos.Count}表计信息"); } else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) @@ -207,25 +228,86 @@ namespace JiShe.CollectBus.ScheduledMeterReading public virtual async Task InitAmmeterCacheData(string gatherCode = "") { #if DEBUG + //var timeDensity = "15"; + //string tempCacheMeterInfoKey = $"CollectBus:{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}"; + ////获取缓存中的电表信息 + //var redisKeyList = $"{string.Format(tempCacheMeterInfoKey, SystemType, "JiSheCollectBus", MeterTypeEnum.Ammeter, timeDensity)}*"; + + //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + //var tempMeterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); + ////List focusAddressDataLista = new List(); + //List meterInfos = new List(); + //foreach (var item in tempMeterInfos) + //{ + // var tempData = item.Adapt(); + // tempData.FocusId = item.FocusID; + // tempData.MeterId = item.Id; + // meterInfos.Add(tempData); + // //focusAddressDataLista.Add(item.FocusAddress); + //} + + + var timeDensity = "15"; - //获取缓存中的电表信息 - var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*"; + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; + var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, "JiSheCollectBus2", MeterTypeEnum.Ammeter, timeDensity)}"; - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); + List meterInfos = new List(); List focusAddressDataLista = new List(); - foreach (var item in meterInfos) - { - focusAddressDataLista.Add(item.FocusAddress); - } + var timer1 = Stopwatch.StartNew(); + //decimal? cursor = null; + //string member = null; + //bool hasNext; + //do + //{ + // var page = await _redisDataCacheService.GetAllPagedDataOptimized( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); - return; + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + //} while (hasNext); + + var allIds = new HashSet(); + decimal? score = null; + string member = null; + + while (true) + { + var page = await _redisDataCacheService.GetAllPagedDataOptimized( + redisCacheMeterInfoHashKeyTemp, + redisCacheMeterInfoZSetScoresIndexKeyTemp, + pageSize: 1000, + lastScore: score, + lastMember: member); + + meterInfos.AddRange(page.Items); + focusAddressDataLista.AddRange(page.Items.Select(d=>d.FocusAddress)); + foreach (var item in page.Items) + { + if (!allIds.Add(item.MemberId)) + throw new Exception("Duplicate data found!"); + } + if (!page.HasNext) break; + score = page.NextScore; + member = page.NextMember; + } + + + timer1.Stop(); + _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); + //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + //return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); #endif - if (meterInfos == null || meterInfos.Count <= 0) { throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); @@ -237,6 +319,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空"); } + var timer = Stopwatch.StartNew(); List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。 @@ -244,6 +327,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) { + var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; + var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; + var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; + + List ammeterInfos = new List(); //将表计信息根据集中器分组,获得集中器号 var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); foreach (var item in meterInfoGroup) @@ -255,17 +343,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading focusAddressDataList.Add(item.Key); - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; + // var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; #if DEBUG //每次缓存时,删除缓存,避免缓存数据有不准确的问题 //await FreeRedisProvider.Instance.DelAsync(redisCacheKey); #else //每次缓存时,删除缓存,避免缓存数据有不准确的问题 - await FreeRedisProvider.Instance.DelAsync(redisCacheKey); + //await FreeRedisProvider.Instance.DelAsync(redisCacheKey); #endif - Dictionary keyValuePairs = new Dictionary(); + //Dictionary keyValuePairs = new Dictionary(); foreach (var ammeter in item) { //处理ItemCode @@ -310,11 +398,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } - keyValuePairs.TryAdd($"{ammeter.ID}", ammeter); + ammeterInfos.Add(ammeter); + //keyValuePairs.TryAdd($"{ammeter.MeterId}", ammeter); } - await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); + //await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); } + await _redisDataCacheService.BatchInsertDataAsync( + redisCacheMeterInfoHashKey, + redisCacheMeterInfoSetIndexKey, + redisCacheMeterInfoZSetScoresIndexKey,ammeterInfos); + //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { @@ -337,7 +431,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading DeviceGroupBalanceControl.InitializeCache(focusAddressDataList); } - _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成"); + timer.Stop(); + + _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒"); } /// @@ -608,9 +704,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// 采集频率 /// 集中器号hash分组的集中器集合数据 + /// 集中器所在分组 + /// 时间格式的任务批次名称 /// private async Task AmmerterCreatePublishTask(int timeDensity - , AmmeterInfo ammeterInfo) + , AmmeterInfo ammeterInfo,int groupIndex,string taskBatch) { var handlerPacketBuilder = TelemetryPacketBuilder.AFNHandlersDictionary; //todo 检查需要待补抄的电表的时间点信息,保存到需要待补抄的缓存中。如果此线程异常,该如何补偿? @@ -618,7 +716,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading var currentTime = DateTime.Now; var pendingCopyReadTime = currentTime.AddMinutes(timeDensity); //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 - var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{ammeterInfo.FocusAddress}"; + var redisCacheTelemetryPacketInfoHashKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoSetIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; + var redisCacheTelemetryPacketInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity, groupIndex, taskBatch)}"; if (string.IsNullOrWhiteSpace(ammeterInfo.ItemCodes)) { @@ -699,7 +799,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading } } - Dictionary keyValuePairs = new Dictionary(); + //Dictionary keyValuePairs = new Dictionary(); + List taskList = new List(); foreach (var tempItem in tempCodes) { @@ -754,17 +855,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading - var meterReadingRecords = new MeterReadingRecords() + var meterReadingRecords = new MeterReadingTelemetryPacketInfo() { ProjectID = ammeterInfo.ProjectID, DatabaseBusiID = ammeterInfo.DatabaseBusiID, PendingCopyReadTime = pendingCopyReadTime, CreationTime = currentTime, MeterAddress = ammeterInfo.AmmerterAddress, - MeterId = ammeterInfo.ID, + MeterId = ammeterInfo.MeterId, MeterType = MeterTypeEnum.Ammeter, FocusAddress = ammeterInfo.FocusAddress, - FocusID = ammeterInfo.FocusID, + FocusId = ammeterInfo.FocusId, AFN = aFN, Fn = fn, ItemCode = tempItem, @@ -774,9 +875,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageHexString = Convert.ToHexString(dataInfos), }; - meterReadingRecords.CreateDataId(GuidGenerator.Create()); - keyValuePairs.TryAdd($"{ammeterInfo.ID}_{tempItem}", meterReadingRecords); + //meterReadingRecords.CreateDataId(GuidGenerator.Create()); + + taskList.Add(meterReadingRecords); } //TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); //await Task.Delay(timeSpan); @@ -784,14 +886,25 @@ namespace JiShe.CollectBus.ScheduledMeterReading //return keyValuePairs; // await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); - using (var pipe = FreeRedisProvider.Instance.StartPipe()) + //using (var pipe = FreeRedisProvider.Instance.StartPipe()) + //{ + // pipe.HSet(redisCacheKey, keyValuePairs); + // object[] ret = pipe.EndPipe(); + //} + if (taskList == null + || taskList.Count() <= 0 + || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoHashKey) + || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoSetIndexKey) + || string.IsNullOrWhiteSpace(redisCacheTelemetryPacketInfoZSetScoresIndexKey)) { - pipe.HSet(redisCacheKey, keyValuePairs); - object[] ret = pipe.EndPipe(); + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 写入参数异常,{redisCacheTelemetryPacketInfoHashKey}:{redisCacheTelemetryPacketInfoSetIndexKey}:{redisCacheTelemetryPacketInfoZSetScoresIndexKey},-101"); + return; } - - - await Task.CompletedTask; + await _redisDataCacheService.BatchInsertDataAsync( + redisCacheTelemetryPacketInfoHashKey, + redisCacheTelemetryPacketInfoSetIndexKey, + redisCacheTelemetryPacketInfoZSetScoresIndexKey, + taskList); } /// @@ -881,7 +994,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading foreach (var focusInfo in focusGroup) { //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 - var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}"; + var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}"; foreach (var ammeterInfo in focusInfo.Value) { @@ -915,22 +1028,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (string.IsNullOrWhiteSpace(ammeter.AreaCode)) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信区号为空"); + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信区号为空"); continue; } if (string.IsNullOrWhiteSpace(ammeter.Address)) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址为空"); + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址为空"); continue; } if (Convert.ToInt32(ammeter.Address) > 65535) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},集中器通信地址无效,确保大于65535"); + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},集中器通信地址无效,确保大于65535"); continue; } if (ammeter.MeteringCode <= 0 || ammeter.MeteringCode > 2033) { - _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.ID},非有效测量点号({ammeter.MeteringCode})"); + _logger.LogError($"{nameof(AmmerterCreatePublishTask)} 表ID:{ammeter.MeterId},非有效测量点号({ammeter.MeteringCode})"); continue; } @@ -1028,10 +1141,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading PendingCopyReadTime = pendingCopyReadTime, CreationTime = currentTime, MeterAddress = ammeter.AmmerterAddress, - MeterId = ammeter.ID, + MeterId = ammeter.MeterId, MeterType = MeterTypeEnum.Ammeter, FocusAddress = ammeter.FocusAddress, - FocusID = ammeter.FocusID, + FocusID = ammeter.FocusId, AFN = aFN, Fn = fn, ItemCode = tempItem, @@ -1041,9 +1154,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading IssuedMessageId = GuidGenerator.Create().ToString(), IssuedMessageHexString = Convert.ToHexString(dataInfos), }; - meterReadingRecords.CreateDataId(GuidGenerator.Create()); + //meterReadingRecords.CreateDataId(GuidGenerator.Create()); - keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords); + keyValuePairs.TryAdd($"{ammeter.MeterId}_{tempItem}", meterReadingRecords); } await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); } @@ -1097,12 +1210,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}"; + var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, itemTimeDensity.Key)}{item.Key}"; Dictionary keyValuePairs = new Dictionary(); foreach (var subItem in item) { - keyValuePairs.TryAdd($"{subItem.ID}", subItem); + keyValuePairs.TryAdd($"{subItem.MeterId}", subItem); } await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); } @@ -1245,7 +1358,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType) { - return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*"; + return $"{string.Format(RedisConst.CacheTelemetryPacketInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}*"; } #endregion diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 3b0da49..25c5476 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using Confluent.Kafka; using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; +using JiShe.CollectBus.Application.Contracts; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; using JiShe.CollectBus.Common.Helpers; @@ -35,8 +36,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService { string serverTagName = string.Empty; - public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider) + public EnergySystemScheduledMeterReadingService( + ILogger logger, + ICapPublisher producerBus, IIoTDBProvider dbProvider, + IMeterReadingRecordRepository meterReadingRecordRepository, + IConfiguration configuration, + IProducerService producerService, + IRedisDataCacheService redisDataCacheService) + : base(logger, + producerBus, + meterReadingRecordRepository, + producerService, + redisDataCacheService, + dbProvider) { serverTagName = configuration.GetValue(CommonConst.ServerTagName)!; } @@ -80,11 +92,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading // Baudrate = 2400, // FocusAddress = "402440506", // Name = "张家祠工务(三相电表)", - // FocusID = 95780, + // FocusId = 95780, // DatabaseBusiID = 1, // MeteringCode = 1, // AmmerterAddress = "402410040506", - // ID = 127035, + // MeterId = 127035, // TypeName = 3, // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", // TimeDensity = 15, @@ -94,11 +106,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading // Baudrate = 2400, // FocusAddress = "542400504", // Name = "五号配(长芦二所四排)(单相电表)", - // FocusID = 69280, + // FocusId = 69280, // DatabaseBusiID = 1, // MeteringCode = 2, // AmmerterAddress = "542410000504", - // ID = 95594, + // MeterId = 95594, // TypeName = 1, // DataTypes = "581,589,592,597,601", // TimeDensity = 15, @@ -106,7 +118,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //return ammeterInfos; - string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID + string sql = $@"SELECT C.ID as MeterId,C.Name,C.FocusID as FocusId,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID FROM TB_GatherInfo(NOLOCK) AS A INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0 INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100 @@ -133,9 +145,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading public override async Task> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890") { string sql = $@"SELECT - A.ID, + A.ID as MeterId, A.Name, - A.FocusID, + A.FocusID as FocusId, A.MeteringCode, A.Baudrate, A.MeteringPort, diff --git a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs index ea4323b..7ac170b 100644 --- a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs +++ b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs @@ -28,25 +28,49 @@ namespace JiShe.CollectBus.Common.Consts /// public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen"; + public const string MeterInfo = "MeterInfo"; /// /// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// - public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}:"; + public const string CacheMeterInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:{"{3}"}"; + /// + /// 缓存表计信息索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 + /// + public const string CacheMeterInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:SetIndex:{"{3}"}"; + + /// + /// 缓存表计信息排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 + /// + public const string CacheMeterInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:ZSetScoresIndex:{"{3}"}"; + + + public const string TaskInfo = "TaskInfo"; /// /// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// - public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TaskInfo:{"{2}"}:{"{3}"}"; + public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TaskInfo}:{"{2}"}:{"{3}"}"; + + public const string TelemetryPacket = "TelemetryPacket"; + /// + /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次 + /// + public const string CacheTelemetryPacketInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:{"{3}"}:{"{4}"}:{"{5}"}"; /// - /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 + /// 缓存表计下发指令数据集索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次 /// - public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:"; + public const string CacheTelemetryPacketInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:SetIndex:{"{3}"}:{"{4}"}:{"{5}"}"; /// - /// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记 + /// 缓存表计下发指令数据集排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率,{4}=>集中器所在分组,{5}=>时间格式的任务批次 /// - public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap"; + public const string CacheTelemetryPacketInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{TelemetryPacket}:{"{2}"}:ZSetScoresIndex:{"{3}"}:{"{4}"}:{"{5}"}"; + + ///// + ///// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记 + ///// + //public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap"; public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey"; } diff --git a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs index 4be0f4f..06b7d70 100644 --- a/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs +++ b/src/JiShe.CollectBus.Common/DeviceBalanceControl/DeviceGroupBalanceControl.cs @@ -161,7 +161,6 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl MaxDegreeOfParallelism = maxThreads.Value, }; - TimeSpan timeSpan = TimeSpan.FromMicroseconds(5); await Task.Run(() => { Parallel.For(0, cache.CachedGroups.Length, options, async groupId => @@ -169,8 +168,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl var queue = groupQueues[groupId]; while (queue.TryDequeue(out T item)) { - await Task.Delay(timeSpan); - processor(item, Thread.CurrentThread.ManagedThreadId); + processor(item, groupId); } }); }); @@ -183,14 +181,14 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl /// 已经分组的设备信息 /// 部分或者全部的已经分组的设备集合 /// 从泛型对象提取deviceId - /// 处理委托(参数:当前对象,线程ID) + /// 处理委托(参数:当前对象,分组ID) /// 可选最佳并发度 /// /// public static async Task ProcessWithThrottleAsync( List items, Func deviceIdSelector, - Action processor, + Action processor, int? maxConcurrency = null) { var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化"); @@ -244,7 +242,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl /// /// 分组异步处理(带节流) /// - private static async Task ProcessItemAsync(T item, Action processor, int groupId) + private static async Task ProcessItemAsync(T item, Action processor, int groupId) { // 使用内存缓存降低CPU负载 await Task.Yield(); // 立即释放当前线程 @@ -255,7 +253,7 @@ namespace JiShe.CollectBus.Common.DeviceBalanceControl { ExecutionContext.Run(context!, state => { - processor(item); + processor(item,groupId); }, null); }); } diff --git a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs index 80d2f23..b17e9c2 100644 --- a/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/EnumerableExtensions.cs @@ -89,5 +89,26 @@ namespace JiShe.CollectBus.Common.Extensions if (buffer.Count > 0) yield return buffer; } + + //public static IEnumerable> Batch(this IEnumerable source, int batchSize) + //{ + // if (batchSize <= 0) + // throw new ArgumentOutOfRangeException(nameof(batchSize)); + + // using var enumerator = source.GetEnumerator(); + // while (enumerator.MoveNext()) + // { + // yield return GetBatch(enumerator, batchSize); + // } + //} + + //private static IEnumerable GetBatch(IEnumerator enumerator, int batchSize) + //{ + // do + // { + // yield return enumerator.Current; + // batchSize--; + // } while (batchSize > 0 && enumerator.MoveNext()); + //} } } diff --git a/src/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj b/src/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj index b22e8ca..05645ef 100644 --- a/src/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj +++ b/src/JiShe.CollectBus.Common/JiShe.CollectBus.Common.csproj @@ -17,6 +17,7 @@ + diff --git a/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs b/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs new file mode 100644 index 0000000..465bd15 --- /dev/null +++ b/src/JiShe.CollectBus.Common/Models/BusCacheGlobalPagedResult.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Models +{ + /// + /// 缓存全局分页结果 + /// + /// + public class BusCacheGlobalPagedResult + { + /// + /// 数据集合 + /// + public List Items { get; set; } + + /// + /// 总条数 + /// + public long TotalCount { get; set; } + + /// + /// 每页条数 + /// + public int PageSize { get; set; } + + /// + /// 总页数 + /// + public int PageCount + { + get + { + return (int)Math.Ceiling((double)TotalCount / PageSize); + } + } + + /// + /// 是否有下一页 + /// + public bool HasNext { get; set; } + + /// + /// 下一页的分页索引 + /// + public decimal? NextScore { get; set; } + + /// + /// 下一页的分页索引 + /// + public string NextMember { get; set; } + } +} diff --git a/src/JiShe.CollectBus.Common/Models/BusGlobalPagedResult.cs b/src/JiShe.CollectBus.Common/Models/BusGlobalPagedResult.cs deleted file mode 100644 index 6ae8b91..0000000 --- a/src/JiShe.CollectBus.Common/Models/BusGlobalPagedResult.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace JiShe.CollectBus.Common.Models -{ - public class GlobalPagedResult - { - /// - /// 数据集合 - /// - public List Items { get; set; } - - /// - /// 是否有下一页 - /// - public bool HasNext { get; set; } - - /// - /// 下一页的分页索引 - /// - public long? NextScore { get; set; } - - /// - /// 下一页的分页索引 - /// - public string NextMember { get; set; } - } -} diff --git a/src/JiShe.CollectBus.Common/Models/BusPagedResult.cs b/src/JiShe.CollectBus.Common/Models/BusPagedResult.cs index 2e68b14..f2bd1a3 100644 --- a/src/JiShe.CollectBus.Common/Models/BusPagedResult.cs +++ b/src/JiShe.CollectBus.Common/Models/BusPagedResult.cs @@ -31,5 +31,20 @@ namespace JiShe.CollectBus.Common.Models /// 数据集合 /// public IEnumerable Items { get; set; } + + /// + /// 是否有下一页 + /// + public bool HasNext { get; set; } + + /// + /// 下一页的分页索引 + /// + public decimal? NextScore { get; set; } + + /// + /// 下一页的分页索引 + /// + public string NextMember { get; set; } } } diff --git a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs index 05b654d..d397151 100644 --- a/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs +++ b/src/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs @@ -9,7 +9,7 @@ namespace JiShe.CollectBus.Common.Models /// /// 设备缓存基础模型 /// - public class DeviceCacheBasicModel + public abstract class DeviceCacheBasicModel { /// /// 集中器Id @@ -20,5 +20,15 @@ namespace JiShe.CollectBus.Common.Models /// 表Id /// public int MeterId { get; set; } + + /// + /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 + /// + public virtual string MemberId => $"{FocusId}:{MeterId}"; + + /// + /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 + /// + public virtual long ScoreValue=> ((long)FocusId << 32) | (uint)MeterId; } } diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs index fa21071..8b082bb 100644 --- a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs +++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfo.cs @@ -1,4 +1,6 @@ -using System; +using FreeSql.DataAnnotations; +using JiShe.CollectBus.Common.Models; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -6,23 +8,25 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.Ammeters { - public class AmmeterInfo + public class AmmeterInfo: DeviceCacheBasicModel { /// - /// 电表ID + /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 /// - public int ID { get; set; } + [Column(IsIgnore = true)] + public override string MemberId => $"{FocusId}:{MeterId}"; + + /// + /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 + /// + [Column(IsIgnore = true)] + public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks; /// /// 电表名称 /// public string Name { get; set; } - /// - /// 集中器ID - /// - public int FocusID { get; set; } - /// /// 集中器地址 /// diff --git a/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfoTemp.cs b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfoTemp.cs new file mode 100644 index 0000000..fce33fa --- /dev/null +++ b/src/JiShe.CollectBus.Domain/Ammeters/AmmeterInfoTemp.cs @@ -0,0 +1,152 @@ +using JiShe.CollectBus.Common.Models; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Ammeters +{ + public class AmmeterInfoTemp + { + /// + /// 集中器Id + /// + public int FocusID { get; set; } + + /// + /// 表Id + /// + public int Id { get; set; } + + /// + /// 电表名称 + /// + public string Name { get; set; } + + /// + /// 集中器地址 + /// + public string FocusAddress { get; set; } + + /// + /// 集中器地址 + /// + public string Address { get; set; } + + /// + /// 集中器区域代码 + /// + public string AreaCode { get; set; } + + /// + /// 电表类别 (1单相、2三相三线、3三相四线), + /// 07协议: 开合闸指令(1A开闸断电,1C单相表合闸,1B多相表合闸) 645 2007 表 + /// 97协议://true(合闸);false(跳闸) 545 1997 没有单相多相 之分 "true" ? "9966" : "3355" + /// + public int TypeName { get; set; } + + /// + /// 跳合闸状态字段: 0 合闸,1 跳闸 + /// 电表:TripState (0 合闸-通电, 1 断开、跳闸); + /// + public int TripState { get; set; } + + /// + /// 规约 -电表default(30) 1:97协议,30:07协议 + /// + public int? Protocol { get; set; } + + /// + /// 一个集中器下的[MeteringCode]必须唯一。 PN + /// + public int MeteringCode { get; set; } + + /// + /// 电表通信地址 + /// + public string AmmerterAddress { get; set; } + + /// + /// 波特率 default(2400) + /// + public int Baudrate { get; set; } + + /// + /// MeteringPort 端口就几个可以枚举。 + /// + public int MeteringPort { get; set; } + + /// + /// 电表密码 + /// + public string Password { get; set; } + + /// + /// 采集时间间隔(分钟,如15) + /// + public int TimeDensity { get; set; } + + /// + /// 该电表方案下采集项,JSON格式,如:["0D_80","0D_80"] + /// + public string ItemCodes { get; set; } + + /// + /// State表状态: + /// 0新装(未下发),1运行(档案下发成功时设置状态值1), 2暂停, 100销表(销表后是否重新启用) + /// 特定:State -1 已删除 + /// + public int State { get; set; } + + /// + /// 是否自动采集(0:主动采集,1:自动采集) + /// + public int AutomaticReport { get; set; } + + /// + /// 该电表方案下采集项编号 + /// + public string DataTypes { get; set; } + + /// + /// 品牌型号 + /// + public string BrandType { get; set; } + + /// + /// 采集器编号 + /// + public string GatherCode { get; set; } + + /// + /// 是否特殊表,1是特殊电表 + /// + public int Special { get; set; } + + /// + /// 费率类型,单、多 (SingleRate :单费率(单相表1),多费率(其他0) ,与TypeName字段无关) + /// SingleRate ? "单" : "复" + /// [SingleRate] --0 复费率 false , 1 单费率 true (与PayPlanID保持一致) + ///对应 TB_PayPlan.Type: 1复费率,2单费率 + /// + public bool SingleRate { get; set; } + + /// + /// 项目ID + /// + public int ProjectID { get; set; } + + /// + /// 数据库业务ID + /// + public int DatabaseBusiID { get; set; } + + /// + /// 是否异常集中器 0:正常,1异常 + /// + public int AbnormalState { get; set; } + + public DateTime LastTime { get; set; } + } +} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs new file mode 100644 index 0000000..c3f75d3 --- /dev/null +++ b/src/JiShe.CollectBus.Domain/IotSystems/MeterReadingRecords/MeterReadingTelemetryPacketInfo.cs @@ -0,0 +1,141 @@ +using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Models; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Domain.Entities; +using Volo.Abp.Domain.Entities.Auditing; + +namespace JiShe.CollectBus.IotSystems.MeterReadingRecords +{ + /// + /// 抄读任务Redis缓存数据记录 + /// + public class MeterReadingTelemetryPacketInfo : DeviceCacheBasicModel + { + /// + /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 + /// + public override string MemberId => $"{FocusId}:{MeterId}:{ItemCode}"; + + /// + /// ZSet排序索引分数值,具体值可以根据不同业务场景进行定义,例如时间戳 + /// + public override long ScoreValue => ((long)FocusId << 32) | (uint)DateTime.Now.Ticks; + + + /// + /// 是否手动操作 + /// + public bool ManualOrNot { get; set; } + + /// + /// 任务数据唯一标记 + /// + public string TaskMark { get; set; } + + /// + /// 时间戳标记,IoTDB时间列处理,上报通过构建标记获取唯一标记匹配时间戳。 + /// + public long Timestamps { get; set; } + + /// + /// 是否超时 + /// + public bool IsTimeout { get; set; } = false; + + /// + /// 待抄读时间 + /// + public DateTime PendingCopyReadTime { get; set; } + + + /// + /// 集中器地址 + /// + public string FocusAddress { get; set; } + + /// + /// 表地址 + /// + public string MeterAddress { get; set; } + + /// + /// 表类型 + /// + public MeterTypeEnum MeterType { get; set; } + + /// + /// 项目ID + /// + public int ProjectID { get; set; } + + /// + /// 数据库业务ID + /// + public int DatabaseBusiID { get; set; } + + /// + /// AFN功能码 + /// + public AFN AFN { get; set; } + + /// + /// 抄读功能码 + /// + public int Fn { get; set; } + + /// + /// 抄读计量点 + /// + public int Pn { get; set; } + + /// + /// 采集项编码 + /// + public string ItemCode { get; set;} + + + /// + /// 创建时间 + /// + public DateTime CreationTime { get; set; } + + /// + /// 下发消息内容 + /// + public string IssuedMessageHexString { get; set; } + + /// + /// 下发消息Id + /// + public string IssuedMessageId { get; set; } + + /// + /// 消息上报内容 + /// + public string? ReceivedMessageHexString { get; set; } + + /// + /// 消息上报时间 + /// + public DateTime? ReceivedTime { get; set; } + + /// + /// 上报消息Id + /// + public string ReceivedMessageId { get; set; } + + /// + /// 上报报文解析备注,异常情况下才有 + /// + public string ReceivedRemark { get; set; } + + //public void CreateDataId(Guid Id) + //{ + // this.Id = Id; + //} + } +} diff --git a/src/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs b/src/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs index c735a9a..966192b 100644 --- a/src/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs +++ b/src/JiShe.CollectBus.Domain/IotSystems/Watermeter/WatermeterInfo.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Models; using System; using System.Collections.Generic; using System.Linq; @@ -10,13 +11,8 @@ namespace JiShe.CollectBus.IotSystems.Watermeter /// /// 水表信息 /// - public class WatermeterInfo - { - /// - /// 水表ID - /// - public int ID { get; set; } - + public class WatermeterInfo: DeviceCacheBasicModel + { /// /// 水表名称 /// @@ -25,12 +21,7 @@ namespace JiShe.CollectBus.IotSystems.Watermeter /// 表密码 /// public string Password { get; set; } - - /// - /// 集中器ID - /// - public int FocusID { get; set; } - + /// /// 集中器地址 /// diff --git a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs index c76f9e6..d3a9bff 100644 --- a/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs +++ b/src/JiShe.CollectBus.FreeRedisProvider/FreeRedisProvider.cs @@ -1,11 +1,14 @@ using FreeRedis; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; -using JiShe.CollectBus.FreeRedisProvider.Options; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.FreeRedisProvider.Options; using Microsoft.Extensions.Options; using System.Diagnostics; using System.Text.Json; using Volo.Abp.DependencyInjection; +using static System.Runtime.InteropServices.JavaScript.JSType; +using System.Collections.Concurrent; namespace JiShe.CollectBus.FreeRedisProvider { @@ -33,450 +36,472 @@ namespace JiShe.CollectBus.FreeRedisProvider public IRedisClient GetInstance() { - var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB}"; + var connectionString = $"{_option.Configuration},defaultdatabase={_option.DefaultDB},MaxPoolSize={_option.MaxPoolSize}"; Instance = new RedisClient(connectionString); Instance.Serialize = obj => BusJsonSerializer.Serialize(obj); Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type); - Instance.Notice += (s, e) => Trace.WriteLine(e.Log); + Instance.Notice += (s, e) => Trace.WriteLine(e.Log); return Instance; } - - //public async Task AddMeterZSetCacheData(string redisCacheKey, string redisCacheIndexKey, decimal score, T data) + ///// + ///// 单个添加数据 + ///// + ///// + ///// 主数据存储Hash缓存Key + ///// 集中器索引Set缓存Key + ///// 集中器排序索引ZSET缓存Key + ///// 集中器采集频率分组全局索引ZSet缓存Key + ///// 表计信息 + ///// 可选时间戳 + ///// + //public async Task AddMeterCacheData( + //string redisCacheKey, + //string redisCacheFocusIndexKey, + //string redisCacheScoresIndexKey, + //string redisCacheGlobalIndexKey, + //T data, + //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel //{ - // if (score < 0 || data == null || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey)) + // // 参数校验增强 + // if (data == null || string.IsNullOrWhiteSpace(redisCacheKey) + // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) + // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) + // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) // { - // throw new Exception($"{nameof(AddMeterZSetCacheData)} 参数异常,-101"); + // throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101"); // } - // // 生成唯一member标识 - // var member = data.Serialize(); + // // 计算组合score(分类ID + 时间戳) + // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; - // // 计算score范围 - // decimal dataScore = (long)score << 32; + // long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks; - // //// 事务操作 - // //using (var tran = FreeRedisProvider.Instance.Multi()) - // //{ - // // await tran.ZAddAsync(cacheKey, score,member); - // // await tran.SAddAsync($"cat_index:{categoryId}", member); - // // object[] ret = tran.Exec(); - // //} + // //全局索引写入 + // long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); - // using (var pipe = Instance.StartPipe()) + // // 使用事务保证原子性 + // using (var trans = Instance.Multi()) // { - // pipe.ZAdd(redisCacheKey, dataScore, member); - // pipe.SAdd(redisCacheIndexKey, member); - // object[] ret = pipe.EndPipe(); + // // 主数据存储Hash + // trans.HSet(redisCacheKey, data.MemberID, data.Serialize()); + + // // 分类索引 + // trans.SAdd(redisCacheFocusIndexKey, data.MemberID); + + // // 排序索引使用ZSET + // trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID); + + // //全局索引 + // trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID); + + // var results = trans.Exec(); + + // if (results == null || results.Length <= 0) + // throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102"); // } // await Task.CompletedTask; //} - //public async Task> GetMeterZSetPagedData( + ///// + ///// 批量添加数据 + ///// + ///// + ///// 主数据存储Hash缓存Key + ///// 集中器索引Set缓存Key + ///// 集中器排序索引ZSET缓存Key + ///// 集中器采集频率分组全局索引ZSet缓存Key + ///// 数据集合 + ///// 可选时间戳 + ///// + //public async Task BatchAddMeterData( //string redisCacheKey, - //string redisCacheIndexKey, - //decimal score, - //int pageSize = 10, - //int pageIndex = 1) + //string redisCacheFocusIndexKey, + //string redisCacheScoresIndexKey, + //string redisCacheGlobalIndexKey, + //IEnumerable items, + //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel //{ - // if (score < 0 || string.IsNullOrWhiteSpace(redisCacheKey) || string.IsNullOrWhiteSpace(redisCacheIndexKey)) + // if (items == null + // || items.Count() <=0 + // || string.IsNullOrWhiteSpace(redisCacheKey) + // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) + // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) + // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) // { - // throw new Exception($"{nameof(GetMeterZSetPagedData)} 参数异常,-101"); + // throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101"); // } - // // 计算score范围 - // decimal minScore = (long)score << 32; - // decimal maxScore = ((long)score + 1) << 32; + // const int BATCH_SIZE = 1000; // 每批1000条 + // var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); - // // 分页参数 + // foreach (var batch in items.Batch(BATCH_SIZE)) + // { + // await semaphore.WaitAsync(); + + // _ = Task.Run(() => + // { + // using (var pipe = Instance.StartPipe()) + // { + // foreach (var item in batch) + // { + // // 计算组合score(分类ID + 时间戳) + // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; + + // long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks; + + // //全局索引写入 + // long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); + + // // 主数据存储Hash + // pipe.HSet(redisCacheKey, item.MemberID, item.Serialize()); + + // // 分类索引Set + // pipe.SAdd(redisCacheFocusIndexKey, item.MemberID); + + // // 排序索引使用ZSET + // pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID); + + // //全局索引 + // pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID); + // } + // pipe.EndPipe(); + // } + // semaphore.Release(); + // }); + // } + + // await Task.CompletedTask; + //} + + ///// + ///// 删除指定redis缓存key的缓存数据 + ///// + ///// + ///// 主数据存储Hash缓存Key + ///// 集中器索引Set缓存Key + ///// 集中器排序索引ZSET缓存Key + ///// 集中器采集频率分组全局索引ZSet缓存Key + ///// 表计信息 + ///// + //public async Task RemoveMeterData( + //string redisCacheKey, + //string redisCacheFocusIndexKey, + //string redisCacheScoresIndexKey, + //string redisCacheGlobalIndexKey, + //T data) where T : DeviceCacheBasicModel + //{ + + // if (data == null + // || string.IsNullOrWhiteSpace(redisCacheKey) + // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) + // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) + // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) + // { + // throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101"); + // } + + // const string luaScript = @" + // local mainKey = KEYS[1] + // local focusIndexKey = KEYS[2] + // local scoresIndexKey = KEYS[3] + // local globalIndexKey = KEYS[4] + // local member = ARGV[1] + + // local deleted = 0 + // if redis.call('HDEL', mainKey, member) > 0 then + // deleted = 1 + // end + + // redis.call('SREM', focusIndexKey, member) + // redis.call('ZREM', scoresIndexKey, member) + // redis.call('ZREM', globalIndexKey, member) + // return deleted + // "; + + // var keys = new[] + // { + // redisCacheKey, + // redisCacheFocusIndexKey, + // redisCacheScoresIndexKey, + // redisCacheGlobalIndexKey + // }; + + // var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID }); + + // if ((int)result == 0) + // throw new KeyNotFoundException("指定数据不存在"); + //} + + ///// + ///// 修改表计缓存信息 + ///// + ///// + ///// 主数据存储Hash缓存Key + ///// 旧集中器索引Set缓存Key + ///// 新集中器索引Set缓存Key + ///// 集中器排序索引ZSET缓存Key + ///// 集中器采集频率分组全局索引ZSet缓存Key + ///// 表计信息 + ///// 可选时间戳 + ///// + //public async Task UpdateMeterData( + //string redisCacheKey, + //string oldRedisCacheFocusIndexKey, + //string newRedisCacheFocusIndexKey, + //string redisCacheScoresIndexKey, + //string redisCacheGlobalIndexKey, + //T newData, + //DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel + //{ + // if (newData == null + // || string.IsNullOrWhiteSpace(redisCacheKey) + // || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey) + // || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey) + // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) + // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) + // { + // throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101"); + // } + + // var luaScript = @" + // local mainKey = KEYS[1] + // local oldFocusIndexKey = KEYS[2] + // local newFocusIndexKey = KEYS[3] + // local scoresIndexKey = KEYS[4] + // local globalIndexKey = KEYS[5] + // local member = ARGV[1] + // local newData = ARGV[2] + // local newScore = ARGV[3] + // local newGlobalScore = ARGV[4] + + // -- 校验存在性 + // if redis.call('HEXISTS', mainKey, member) == 0 then + // return 0 + // end + + // -- 更新主数据 + // redis.call('HSET', mainKey, member, newData) + + // -- 处理变更 + // if newScore ~= '' then + // -- 删除旧索引 + // redis.call('SREM', oldFocusIndexKey, member) + // redis.call('ZREM', scoresIndexKey, member) + + // -- 添加新索引 + // redis.call('SADD', newFocusIndexKey, member) + // redis.call('ZADD', scoresIndexKey, newScore, member) + // end + + // -- 更新全局索引 + // if newGlobalScore ~= '' then + // -- 删除旧索引 + // redis.call('ZREM', globalIndexKey, member) + + // -- 添加新索引 + // redis.call('ZADD', globalIndexKey, newGlobalScore, member) + // end + + // return 1 + // "; + + // var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow; + // var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds(); + // var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks; + + // var result = await Instance.EvalAsync(luaScript, + // new[] + // { + // redisCacheKey, + // oldRedisCacheFocusIndexKey, + // newRedisCacheFocusIndexKey, + // redisCacheScoresIndexKey, + // redisCacheGlobalIndexKey + // }, + // new object[] + // { + // newData.MemberID, + // newData.Serialize(), + // newScoreValue.ToString() ?? "", + // newGlobalScore.ToString() ?? "" + // }); + + // if ((int)result == 0) + // { + // throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在"); + // } + //} + + //public async Task> SingleGetMeterPagedData( + //string redisCacheKey, + //string redisCacheScoresIndexKey, + //int focusId, + //int pageSize = 10, + //int pageIndex = 1, + //bool descending = true) + //{ + // // 计算score范围 + // long minScore = (long)focusId << 32; + // long maxScore = ((long)focusId + 1) << 32; + + // // 分页参数计算 // int start = (pageIndex - 1) * pageSize; - // // 查询主数据 - // var members = await Instance.ZRevRangeByScoreAsync( - // redisCacheKey, - // maxScore, + // // 获取排序后的member列表 + // var members = descending + // ? await Instance.ZRevRangeByScoreAsync( + // redisCacheScoresIndexKey, + // maxScore, + // minScore, + // start, + // pageSize) + // : await Instance.ZRangeByScoreAsync( + // redisCacheScoresIndexKey, + // minScore, + // maxScore, + // start, + // pageSize); + + // // 批量获取实际数据 + // var dataTasks = members.Select(m => + // Instance.HGetAsync(redisCacheKey, m)).ToArray(); + // await Task.WhenAll(dataTasks); + + // // 总数统计优化 + // var total = await Instance.ZCountAsync( + // redisCacheScoresIndexKey, // minScore, - // offset: start, - // count: pageSize - // ); - - // if (members == null) - // { - // throw new Exception($"{nameof(GetMeterZSetPagedData)} 获取缓存的信息失败,第 {pageIndex + 1} 页数据未返回,-102"); - // } - - // // 查询总数 - // var total = await Instance.ZCountAsync(redisCacheKey, minScore, maxScore); + // maxScore); // return new BusPagedResult // { - // Items = members.Select(m => - // BusJsonSerializer.Deserialize(m)!).ToList(), + // Items = dataTasks.Select(t => t.Result).ToList(), // TotalCount = total, // PageIndex = pageIndex, // PageSize = pageSize // }; //} - ///// - ///// 删除数据示例 - ///// - ///// - ///// 分类 - ///// - ///// - ///// - //public async Task RemoveMeterZSetData( + + //public async Task> GetFocusPagedData( //string redisCacheKey, - //string redisCacheIndexKey, - //T data) + //string redisCacheScoresIndexKey, + //int focusId, + //int pageSize = 10, + //long? lastScore = null, + //string lastMember = null, + //bool descending = true) where T : DeviceCacheBasicModel //{ + // // 计算分数范围 + // long minScore = (long)focusId << 32; + // long maxScore = ((long)focusId + 1) << 32; - // // 查询需要删除的member - // var members = await Instance.SMembersAsync(redisCacheIndexKey); - // var target = members.FirstOrDefault(m => - // BusJsonSerializer.Deserialize(m) == data);//泛型此处该如何处理? + // // 获取成员列表 + // var members = await GetSortedMembers( + // redisCacheScoresIndexKey, + // minScore, + // maxScore, + // pageSize, + // lastScore, + // lastMember, + // descending); - // if (target != null) + // // 批量获取数据 + // var dataDict = await Instance.HMGetAsync(redisCacheKey, members.CurrentItems); + + // return new BusPagedResult // { - // using (var trans = Instance.Multi()) - // { - // trans.ZRem(redisCacheKey, target); - // trans.SRem(redisCacheIndexKey, target); - // trans.Exec(); - // } - // } + // Items = dataDict, + // TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore), + // HasNext = members.HasNext, + // NextScore = members.NextScore, + // NextMember = members.NextMember + // }; + //} - // await Task.CompletedTask; + //private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)> + // GetSortedMembers( + // string zsetKey, + // long minScore, + // long maxScore, + // int pageSize, + // long? lastScore, + // string lastMember, + // bool descending) + //{ + // var querySize = pageSize + 1; + // var (startScore, exclude) = descending + // ? (lastScore ?? maxScore, lastMember) + // : (lastScore ?? minScore, lastMember); + + // var members = descending + // ? await Instance.ZRevRangeByScoreAsync( + // zsetKey, + // max: startScore, + // min: minScore, + // offset: 0, + // count: querySize) + // : await Instance.ZRangeByScoreAsync( + // zsetKey, + // min: startScore, + // max: maxScore, + // offset: 0, + // count: querySize); + + // var hasNext = members.Length > pageSize; + // var currentItems = members.Take(pageSize).ToArray(); + + // var nextCursor = currentItems.Any() + // ? await GetNextCursor(zsetKey, currentItems.Last(), descending) + // : (null, null); + + // return (currentItems, hasNext, nextCursor.score, nextCursor.member); + //} + + //private async Task GetTotalCount(string zsetKey, long min, long max) + //{ + // // 缓存计数优化 + // var cacheKey = $"{zsetKey}_count_{min}_{max}"; + // var cached = await Instance.GetAsync(cacheKey); + + // if (cached.HasValue) + // return cached.Value; + + // var count = await Instance.ZCountAsync(zsetKey, min, max); + // await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒 + // return count; //} - public async Task AddMeterZSetCacheData( - string redisCacheKey, - string redisCacheIndexKey, - int categoryId, // 新增分类ID参数 - T data, - DateTimeOffset? timestamp = null) - { - // 参数校验增强 - if (data == null || string.IsNullOrWhiteSpace(redisCacheKey) - || string.IsNullOrWhiteSpace(redisCacheIndexKey)) - { - throw new ArgumentException("Invalid parameters"); - } + //public async Task>> BatchGetMeterPagedData( + //string redisCacheKey, + //string redisCacheScoresIndexKey, + //IEnumerable focusIds, + //int pageSizePerFocus = 10) where T : DeviceCacheBasicModel + //{ + // var results = new ConcurrentDictionary>(); + // var parallelOptions = new ParallelOptions + // { + // MaxDegreeOfParallelism = Environment.ProcessorCount * 2 + // }; - // 生成唯一member标识(带数据指纹) - var member = $"{categoryId}:{Guid.NewGuid()}"; - var serializedData = data.Serialize(); + // await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) => + // { + // var data = await SingleGetMeterPagedData( + // redisCacheKey, + // redisCacheScoresIndexKey, + // focusId, + // pageSizePerFocus); - // 计算组合score(分类ID + 时间戳) - var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; + // results.TryAdd(focusId, data); + // }); - long scoreValue = ((long)categoryId << 32) | (uint)actualTimestamp.Ticks; - - //全局索引写入 - long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); + // return new Dictionary>(results); + //} + - // 使用事务保证原子性 - using (var trans = Instance.Multi()) - { - // 主数据存储Hash - trans.HSet(redisCacheKey, member, serializedData); - - // 排序索引使用ZSET - trans.ZAdd($"{redisCacheKey}_scores", scoreValue, member); - - // 分类索引 - trans.SAdd(redisCacheIndexKey, member); - - //全局索引 - trans.ZAdd("global_data_all", globalScore, member); - - var results = trans.Exec(); - - if (results == null || results.Length <= 0) - throw new Exception("Transaction failed"); - } - - await Task.CompletedTask; - } - - public async Task BatchAddMeterData( - string redisCacheKey, - string indexKey, - IEnumerable items) where T : DeviceCacheBasicModel - { - const int BATCH_SIZE = 1000; // 每批1000条 - var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); - - //foreach (var batch in items.Batch(BATCH_SIZE)) - //{ - // await semaphore.WaitAsync(); - - // _ = Task.Run(async () => - // { - // using (var pipe = FreeRedisProvider.Instance.StartPipe()) - // { - // foreach (var item in batch) - // { - // var member = $"{item.CategoryId}:{Guid.NewGuid()}"; - // long score = ((long)item.CategoryId << 32) | (uint)item.Timestamp.Ticks; - - // // Hash主数据 - // pipe.HSet(redisCacheKey, member, item.Data.Serialize()); - - // // 分类索引 - // pipe.ZAdd($"{redisCacheKey}_scores", score, member); - - // // 全局索引 - // pipe.ZAdd("global_data_all", item.Timestamp.ToUnixTimeMilliseconds(), member); - - // // 分类快速索引 - // pipe.SAdd(indexKey, member); - // } - // pipe.EndPipe(); - // } - // semaphore.Release(); - // }); - //} - - await Task.CompletedTask; - } - - public async Task UpdateMeterData( - string redisCacheKey, - string oldCategoryIndexKey, - string newCategoryIndexKey, - string memberId, // 唯一标识(格式:"分类ID:GUID") - T newData, - int? newCategoryId = null, - DateTimeOffset? newTimestamp = null) - { - // 参数校验 - if (string.IsNullOrWhiteSpace(memberId)) - throw new ArgumentException("Invalid member ID"); - - var luaScript = @" - local mainKey = KEYS[1] - local scoreKey = KEYS[2] - local oldIndex = KEYS[3] - local newIndex = KEYS[4] - local member = ARGV[1] - local newData = ARGV[2] - local newScore = ARGV[3] - - -- 校验旧数据是否存在 - if redis.call('HEXISTS', mainKey, member) == 0 then - return 0 - end - - -- 更新主数据 - redis.call('HSET', mainKey, member, newData) - - -- 处理分类变更 - if newScore ~= '' then - -- 删除旧索引 - redis.call('SREM', oldIndex, member) - -- 更新排序分数 - redis.call('ZADD', scoreKey, newScore, member) - -- 添加新索引 - redis.call('SADD', newIndex, member) - end - - return 1 - "; - - // 计算新score(当分类或时间变化时) - long? newScoreValue = null; - if (newCategoryId.HasValue || newTimestamp.HasValue) - { - var parts = memberId.Split(':'); - var oldCategoryId = int.Parse(parts[0]); - - var actualCategoryId = newCategoryId ?? oldCategoryId; - var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow; - - newScoreValue = ((long)actualCategoryId << 32) | (uint)actualTimestamp.Ticks; - } - - var result = await Instance.EvalAsync(luaScript, - new[] - { - redisCacheKey, - $"{redisCacheKey}_scores", - oldCategoryIndexKey, - newCategoryIndexKey - }, - new[] - { - memberId, - newData.Serialize(), - newScoreValue?.ToString() ?? "" - }); - - // 如果时间戳变化则更新全局索引 - if (newTimestamp.HasValue) - { - long newGlobalScore = newTimestamp.Value.ToUnixTimeMilliseconds(); - await Instance.ZAddAsync("global_data_all", newGlobalScore, memberId); - } - - if ((int)result == 0) - throw new KeyNotFoundException("指定数据不存在"); - } - - - public async Task> GetMeterZSetPagedData( - string redisCacheKey, - string redisCacheIndexKey, - int categoryId, - int pageSize = 10, - int pageIndex = 1, - bool descending = true) - { - // 计算score范围 - long minScore = (long)categoryId << 32; - long maxScore = ((long)categoryId + 1) << 32; - - // 分页参数计算 - int start = (pageIndex - 1) * pageSize; - - // 获取排序后的member列表 - var members = descending - ? await Instance.ZRevRangeByScoreAsync( - $"{redisCacheKey}_scores", - maxScore, - minScore, - start, - pageSize) - : await Instance.ZRangeByScoreAsync( - $"{redisCacheKey}_scores", - minScore, - maxScore, - start, - pageSize); - - // 批量获取实际数据 - var dataTasks = members.Select(m => - Instance.HGetAsync(redisCacheKey, m)).ToArray(); - await Task.WhenAll(dataTasks); - - // 总数统计优化 - var total = await Instance.ZCountAsync( - $"{redisCacheKey}_scores", - minScore, - maxScore); - - return new BusPagedResult - { - Items = dataTasks.Select(t => t.Result).ToList(), - TotalCount = total, - PageIndex = pageIndex, - PageSize = pageSize - }; - } - - - public async Task RemoveMeterZSetData( - string redisCacheKey, - string redisCacheIndexKey, - string uniqueId) // 改为基于唯一标识删除 - { - // 原子操作 - var luaScript = @" - local mainKey = KEYS[1] - local scoreKey = KEYS[2] - local indexKey = KEYS[3] - local member = ARGV[1] - - redis.call('HDEL', mainKey, member) - redis.call('ZREM', scoreKey, member) - redis.call('SREM', indexKey, member) - return 1 - "; - - var keys = new[] - { - redisCacheKey, - $"{redisCacheKey}_scores", - redisCacheIndexKey - }; - - var result = await Instance.EvalAsync(luaScript, - keys, - new[] { uniqueId }); - - if ((int)result != 1) - throw new Exception("删除操作失败"); - } - - public async Task> GetGlobalPagedData( - string redisCacheKey, - int pageSize = 10, - long? lastScore = null, - string lastMember = null, - bool descending = true) - { - const string zsetKey = "global_data_all"; - - // 分页参数处理 - var (startScore, excludeMember) = descending - ? (lastScore ?? long.MaxValue, lastMember) - : (lastScore ?? 0, lastMember); - - // 获取成员列表 - string[] members; - if (descending) - { - members = await Instance.ZRevRangeByScoreAsync( - zsetKey, - max: startScore, - min: 0, - offset: 0, - count: pageSize + 1); - } - else - { - members = await Instance.ZRangeByScoreAsync( - zsetKey, - min: startScore, - max: long.MaxValue, - offset: 0, - count: pageSize + 1); - } - - // 处理分页结果 - bool hasNext = members.Length > pageSize; - var actualMembers = members.Take(pageSize).ToArray(); - - // 批量获取数据(优化版本) - var dataTasks = actualMembers - .Select(m => Instance.HGetAsync(redisCacheKey, m)) - .ToArray(); - await Task.WhenAll(dataTasks); - - // 获取下一页游标 - (long? nextScore, string nextMember) = actualMembers.Any() - ? await GetNextCursor(zsetKey, actualMembers.Last(), descending) - : (null, null); - - return new GlobalPagedResult - { - Items = dataTasks.Select(t => t.Result).ToList(), - HasNext = hasNext, - NextScore = nextScore, - NextMember = nextMember - }; - } - - private async Task<(long? score, string member)> GetNextCursor( - string zsetKey, - string lastMember, - bool descending) - { - var score = await Instance.ZScoreAsync(zsetKey, lastMember); - return (score.HasValue ? (long)score.Value : null, lastMember); - } + } } \ No newline at end of file diff --git a/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs b/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs index cc3ff02..8fc6861 100644 --- a/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs +++ b/src/JiShe.CollectBus.FreeRedisProvider/IFreeRedisProvider.cs @@ -1,4 +1,5 @@ using FreeRedis; +using JiShe.CollectBus.Common.Models; namespace JiShe.CollectBus.FreeRedisProvider { @@ -8,7 +9,7 @@ namespace JiShe.CollectBus.FreeRedisProvider /// 获取客户端 /// /// - RedisClient Instance { get; set; } + RedisClient Instance { get; set; } } } diff --git a/src/JiShe.CollectBus.FreeRedisProvider/Options/FreeRedisOptions.cs b/src/JiShe.CollectBus.FreeRedisProvider/Options/FreeRedisOptions.cs index 75d92a2..2ed7e49 100644 --- a/src/JiShe.CollectBus.FreeRedisProvider/Options/FreeRedisOptions.cs +++ b/src/JiShe.CollectBus.FreeRedisProvider/Options/FreeRedisOptions.cs @@ -13,6 +13,11 @@ namespace JiShe.CollectBus.FreeRedisProvider.Options /// public string? Configuration { get; set; } + /// + /// 最大连接数 + /// + public string? MaxPoolSize { get; set; } + /// /// 默认数据库 /// diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 91ff4ea..f830481 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -41,6 +41,7 @@ }, "Redis": { "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", + "MaxPoolSize": "50", "DefaultDB": "14", "HangfireDB": "15" }, @@ -129,7 +130,7 @@ "OpenDebugMode": true, "UseTableSessionPoolByDefault": false }, - "ServerTagName": "JiSheCollectBus", + "ServerTagName": "JiSheCollectBus3", "KafkaReplicationFactor": 3, "NumPartitions": 30, "Cassandra": { diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index c43fa70..159f0fc 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -11,8 +11,6 @@ using Microsoft.Extensions.DependencyInjection; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.Protocols; using MassTransit; -using JiShe.CollectBus.Kafka.Producer; -using JiShe.CollectBus.Common.Helpers; using DotNetCore.CAP; namespace JiShe.CollectBus.Protocol.Contracts.Abstracts diff --git a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj index 2721676..3aa7a77 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj +++ b/src/JiShe.CollectBus.Protocol.Contracts/JiShe.CollectBus.Protocol.Contracts.csproj @@ -7,7 +7,6 @@ -