diff --git a/modules/JiShe.CollectBus.FreeRedis/FreeRedisProvider.cs b/modules/JiShe.CollectBus.FreeRedis/FreeRedisProvider.cs index 4c81d02..b2cf9e5 100644 --- a/modules/JiShe.CollectBus.FreeRedis/FreeRedisProvider.cs +++ b/modules/JiShe.CollectBus.FreeRedis/FreeRedisProvider.cs @@ -37,466 +37,6 @@ namespace JiShe.CollectBus.FreeRedis Instance.Deserialize = (json, type) => BusJsonSerializer.Deserialize(json, type); Instance.Notice += (s, e) => Trace.WriteLine(e.Log); return Instance; - } - - ///// - ///// 单个添加数据 - ///// - ///// - ///// 主数据存储Hash缓存Key - ///// 集中器索引Set缓存Key - ///// 集中器排序索引ZSET缓存Key - ///// 集中器采集频率分组全局索引ZSet缓存Key - ///// 表计信息 - ///// 可选时间戳 - ///// - //public async Task AddMeterCacheData( - //string redisCacheKey, - //string redisCacheFocusIndexKey, - //string redisCacheScoresIndexKey, - //string redisCacheGlobalIndexKey, - //T data, - //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel - //{ - // // 参数校验增强 - // if (data == null || string.IsNullOrWhiteSpace(redisCacheKey) - // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) - // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) - // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) - // { - // throw new ArgumentException($"{nameof(AddMeterCacheData)} 参数异常,-101"); - // } - - // // 计算组合score(分类ID + 时间戳) - // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; - - // long scoreValue = ((long)data.FocusId << 32) | (uint)actualTimestamp.Ticks; - - // //全局索引写入 - // long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); - - // // 使用事务保证原子性 - // using (var trans = Instance.Multi()) - // { - // // 主数据存储Hash - // trans.HSet(redisCacheKey, data.MemberID, data.Serialize()); - - // // 分类索引 - // trans.SAdd(redisCacheFocusIndexKey, data.MemberID); - - // // 排序索引使用ZSET - // trans.ZAdd(redisCacheScoresIndexKey, scoreValue, data.MemberID); - - // //全局索引 - // trans.ZAdd(redisCacheGlobalIndexKey, globalScore, data.MemberID); - - // var results = trans.Exec(); - - // if (results == null || results.Length <= 0) - // throw new Exception($"{nameof(AddMeterCacheData)} 事务提交失败,-102"); - // } - - // await Task.CompletedTask; - //} - - ///// - ///// 批量添加数据 - ///// - ///// - ///// 主数据存储Hash缓存Key - ///// 集中器索引Set缓存Key - ///// 集中器排序索引ZSET缓存Key - ///// 集中器采集频率分组全局索引ZSet缓存Key - ///// 数据集合 - ///// 可选时间戳 - ///// - //public async Task BatchAddMeterData( - //string redisCacheKey, - //string redisCacheFocusIndexKey, - //string redisCacheScoresIndexKey, - //string redisCacheGlobalIndexKey, - //IEnumerable items, - //DateTimeOffset? timestamp = null) where T : DeviceCacheBasicModel - //{ - // if (items == null - // || items.Count() <=0 - // || string.IsNullOrWhiteSpace(redisCacheKey) - // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) - // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) - // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) - // { - // throw new ArgumentException($"{nameof(BatchAddMeterData)} 参数异常,-101"); - // } - - // const int BATCH_SIZE = 1000; // 每批1000条 - // var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); - - // foreach (var batch in items.Batch(BATCH_SIZE)) - // { - // await semaphore.WaitAsync(); - - // _ = Task.Run(() => - // { - // using (var pipe = Instance.StartPipe()) - // { - // foreach (var item in batch) - // { - // // 计算组合score(分类ID + 时间戳) - // var actualTimestamp = timestamp ?? DateTimeOffset.UtcNow; - - // long scoreValue = ((long)item.FocusId << 32) | (uint)actualTimestamp.Ticks; - - // //全局索引写入 - // long globalScore = actualTimestamp.ToUnixTimeMilliseconds(); - - // // 主数据存储Hash - // pipe.HSet(redisCacheKey, item.MemberID, item.Serialize()); - - // // 分类索引Set - // pipe.SAdd(redisCacheFocusIndexKey, item.MemberID); - - // // 排序索引使用ZSET - // pipe.ZAdd(redisCacheScoresIndexKey, scoreValue, item.MemberID); - - // //全局索引 - // pipe.ZAdd(redisCacheGlobalIndexKey, globalScore, item.MemberID); - // } - // pipe.EndPipe(); - // } - // semaphore.Release(); - // }); - // } - - // await Task.CompletedTask; - //} - - ///// - ///// 删除指定redis缓存key的缓存数据 - ///// - ///// - ///// 主数据存储Hash缓存Key - ///// 集中器索引Set缓存Key - ///// 集中器排序索引ZSET缓存Key - ///// 集中器采集频率分组全局索引ZSet缓存Key - ///// 表计信息 - ///// - //public async Task RemoveMeterData( - //string redisCacheKey, - //string redisCacheFocusIndexKey, - //string redisCacheScoresIndexKey, - //string redisCacheGlobalIndexKey, - //T data) where T : DeviceCacheBasicModel - //{ - - // if (data == null - // || string.IsNullOrWhiteSpace(redisCacheKey) - // || string.IsNullOrWhiteSpace(redisCacheFocusIndexKey) - // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) - // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) - // { - // throw new ArgumentException($"{nameof(RemoveMeterData)} 参数异常,-101"); - // } - - // const string luaScript = @" - // local mainKey = KEYS[1] - // local focusIndexKey = KEYS[2] - // local scoresIndexKey = KEYS[3] - // local globalIndexKey = KEYS[4] - // local member = ARGV[1] - - // local deleted = 0 - // if redis.call('HDEL', mainKey, member) > 0 then - // deleted = 1 - // end - - // redis.call('SREM', focusIndexKey, member) - // redis.call('ZREM', scoresIndexKey, member) - // redis.call('ZREM', globalIndexKey, member) - // return deleted - // "; - - // var keys = new[] - // { - // redisCacheKey, - // redisCacheFocusIndexKey, - // redisCacheScoresIndexKey, - // redisCacheGlobalIndexKey - // }; - - // var result = await Instance.EvalAsync(luaScript, keys, new[] { data.MemberID }); - - // if ((int)result == 0) - // throw new KeyNotFoundException("指定数据不存在"); - //} - - ///// - ///// 修改表计缓存信息 - ///// - ///// - ///// 主数据存储Hash缓存Key - ///// 旧集中器索引Set缓存Key - ///// 新集中器索引Set缓存Key - ///// 集中器排序索引ZSET缓存Key - ///// 集中器采集频率分组全局索引ZSet缓存Key - ///// 表计信息 - ///// 可选时间戳 - ///// - //public async Task UpdateMeterData( - //string redisCacheKey, - //string oldRedisCacheFocusIndexKey, - //string newRedisCacheFocusIndexKey, - //string redisCacheScoresIndexKey, - //string redisCacheGlobalIndexKey, - //T newData, - //DateTimeOffset? newTimestamp = null) where T : DeviceCacheBasicModel - //{ - // if (newData == null - // || string.IsNullOrWhiteSpace(redisCacheKey) - // || string.IsNullOrWhiteSpace(oldRedisCacheFocusIndexKey) - // || string.IsNullOrWhiteSpace(newRedisCacheFocusIndexKey) - // || string.IsNullOrWhiteSpace(redisCacheScoresIndexKey) - // || string.IsNullOrWhiteSpace(redisCacheGlobalIndexKey)) - // { - // throw new ArgumentException($"{nameof(UpdateMeterData)} 参数异常,-101"); - // } - - // var luaScript = @" - // local mainKey = KEYS[1] - // local oldFocusIndexKey = KEYS[2] - // local newFocusIndexKey = KEYS[3] - // local scoresIndexKey = KEYS[4] - // local globalIndexKey = KEYS[5] - // local member = ARGV[1] - // local newData = ARGV[2] - // local newScore = ARGV[3] - // local newGlobalScore = ARGV[4] - - // -- 校验存在性 - // if redis.call('HEXISTS', mainKey, member) == 0 then - // return 0 - // end - - // -- 更新主数据 - // redis.call('HSET', mainKey, member, newData) - - // -- 处理变更 - // if newScore ~= '' then - // -- 删除旧索引 - // redis.call('SREM', oldFocusIndexKey, member) - // redis.call('ZREM', scoresIndexKey, member) - - // -- 添加新索引 - // redis.call('SADD', newFocusIndexKey, member) - // redis.call('ZADD', scoresIndexKey, newScore, member) - // end - - // -- 更新全局索引 - // if newGlobalScore ~= '' then - // -- 删除旧索引 - // redis.call('ZREM', globalIndexKey, member) - - // -- 添加新索引 - // redis.call('ZADD', globalIndexKey, newGlobalScore, member) - // end - - // return 1 - // "; - - // var actualTimestamp = newTimestamp ?? DateTimeOffset.UtcNow; - // var newGlobalScore = actualTimestamp.ToUnixTimeMilliseconds(); - // var newScoreValue = ((long)newData.FocusId << 32) | (uint)actualTimestamp.Ticks; - - // var result = await Instance.EvalAsync(luaScript, - // new[] - // { - // redisCacheKey, - // oldRedisCacheFocusIndexKey, - // newRedisCacheFocusIndexKey, - // redisCacheScoresIndexKey, - // redisCacheGlobalIndexKey - // }, - // new object[] - // { - // newData.MemberID, - // newData.Serialize(), - // newScoreValue.ToString() ?? "", - // newGlobalScore.ToString() ?? "" - // }); - - // if ((int)result == 0) - // { - // throw new KeyNotFoundException($"{nameof(UpdateMeterData)}指定Key{redisCacheKey}的数据不存在"); - // } - //} - - //public async Task> SingleGetMeterPagedData( - //string redisCacheKey, - //string redisCacheScoresIndexKey, - //int focusId, - //int pageSize = 10, - //int pageIndex = 1, - //bool descending = true) - //{ - // // 计算score范围 - // long minScore = (long)focusId << 32; - // long maxScore = ((long)focusId + 1) << 32; - - // // 分页参数计算 - // int start = (pageIndex - 1) * pageSize; - - // // 获取排序后的member列表 - // var members = descending - // ? await Instance.ZRevRangeByScoreAsync( - // redisCacheScoresIndexKey, - // maxScore, - // minScore, - // start, - // pageSize) - // : await Instance.ZRangeByScoreAsync( - // redisCacheScoresIndexKey, - // minScore, - // maxScore, - // start, - // pageSize); - - // // 批量获取实际数据 - // var dataTasks = members.Select(m => - // Instance.HGetAsync(redisCacheKey, m)).ToArray(); - // await Task.WhenAll(dataTasks); - - // // 总数统计优化 - // var total = await Instance.ZCountAsync( - // redisCacheScoresIndexKey, - // minScore, - // maxScore); - - // return new BusPagedResult - // { - // Items = dataTasks.Select(t => t.Result).ToList(), - // TotalCount = total, - // PageIndex = pageIndex, - // PageSize = pageSize - // }; - //} - - - //public async Task> GetFocusPagedData( - //string redisCacheKey, - //string redisCacheScoresIndexKey, - //int focusId, - //int pageSize = 10, - //long? lastScore = null, - //string lastMember = null, - //bool descending = true) where T : DeviceCacheBasicModel - //{ - // // 计算分数范围 - // long minScore = (long)focusId << 32; - // long maxScore = ((long)focusId + 1) << 32; - - // // 获取成员列表 - // var members = await GetSortedMembers( - // redisCacheScoresIndexKey, - // minScore, - // maxScore, - // pageSize, - // lastScore, - // lastMember, - // descending); - - // // 批量获取数据 - // var dataDict = await Instance.HMGetAsync(redisCacheKey, members.CurrentItems); - - // return new BusPagedResult - // { - // Items = dataDict, - // TotalCount = await GetTotalCount(redisCacheScoresIndexKey, minScore, maxScore), - // HasNext = members.HasNext, - // NextScore = members.NextScore, - // NextMember = members.NextMember - // }; - //} - - //private async Task<(string[] CurrentItems, bool HasNext, decimal? NextScore, string NextMember)> - // GetSortedMembers( - // string zsetKey, - // long minScore, - // long maxScore, - // int pageSize, - // long? lastScore, - // string lastMember, - // bool descending) - //{ - // var querySize = pageSize + 1; - // var (startScore, exclude) = descending - // ? (lastScore ?? maxScore, lastMember) - // : (lastScore ?? minScore, lastMember); - - // var members = descending - // ? await Instance.ZRevRangeByScoreAsync( - // zsetKey, - // max: startScore, - // min: minScore, - // offset: 0, - // count: querySize) - // : await Instance.ZRangeByScoreAsync( - // zsetKey, - // min: startScore, - // max: maxScore, - // offset: 0, - // count: querySize); - - // var hasNext = members.Length > pageSize; - // var currentItems = members.Take(pageSize).ToArray(); - - // var nextCursor = currentItems.Any() - // ? await GetNextCursor(zsetKey, currentItems.Last(), descending) - // : (null, null); - - // return (currentItems, hasNext, nextCursor.score, nextCursor.member); - //} - - //private async Task GetTotalCount(string zsetKey, long min, long max) - //{ - // // 缓存计数优化 - // var cacheKey = $"{zsetKey}_count_{min}_{max}"; - // var cached = await Instance.GetAsync(cacheKey); - - // if (cached.HasValue) - // return cached.Value; - - // var count = await Instance.ZCountAsync(zsetKey, min, max); - // await Instance.SetExAsync(cacheKey, 60, count); // 缓存60秒 - // return count; - //} - - - //public async Task>> BatchGetMeterPagedData( - //string redisCacheKey, - //string redisCacheScoresIndexKey, - //IEnumerable focusIds, - //int pageSizePerFocus = 10) where T : DeviceCacheBasicModel - //{ - // var results = new ConcurrentDictionary>(); - // var parallelOptions = new ParallelOptions - // { - // MaxDegreeOfParallelism = Environment.ProcessorCount * 2 - // }; - - // await Parallel.ForEachAsync(focusIds, parallelOptions, async (focusId, _) => - // { - // var data = await SingleGetMeterPagedData( - // redisCacheKey, - // redisCacheScoresIndexKey, - // focusId, - // pageSizePerFocus); - - // results.TryAdd(focusId, data); - // }); - - // return new Dictionary>(results); - //} - - - + } } } \ No newline at end of file diff --git a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs index 963d45f..791240d 100644 --- a/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs +++ b/modules/JiShe.CollectBus.IoTDB/Provider/IoTDBProvider.cs @@ -132,9 +132,8 @@ namespace JiShe.CollectBus.IoTDB.Provider public async Task BatchInsertAsync(DeviceMetadata deviceMetadata, IEnumerable entities) where T : IoTEntity { try - { - - var batchSize = 2000; + { + var batchSize = 1000; var batches = entities.Chunk(batchSize); foreach (var batch in batches) diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs index 2183ca2..f4a8371 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs @@ -72,7 +72,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData /// public async Task GetMeterInfoAsync(string meterType,string timeDensity="15") { - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName, meterType, timeDensity)}"; + var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, _applicationOptions.SystemType, _applicationOptions.ServerTagName, meterType, timeDensity)}"; // TODO:临时写死,等确认后如何取再调整 return await Task.FromResult(new AmmeterInfo() { ProjectID = 10000, diff --git a/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs b/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs index d870202..5ed2924 100644 --- a/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/RedisDataCache/IRedisDataCacheService.cs @@ -28,33 +28,16 @@ namespace JiShe.CollectBus.Application.Contracts string redisZSetScoresIndexCacheKey, T data) where T : DeviceCacheBasicModel; + /// /// 批量添加数据 /// /// - /// 主数据存储Hash缓存Key /// Set索引缓存Key - /// ZSET索引缓存Key - /// 设备缓存信息 + /// hash缓存Key /// 待缓存数据集合 /// Task BatchInsertDataAsync( - string redisHashCacheKey, - string redisSetIndexCacheKey, - string redisZSetScoresIndexCacheKey, - string redisDeviceInfoHashCacheKey, - IEnumerable items) where T : DeviceCacheBasicModel; - - - /// - /// 批量添加数据 - /// - /// - /// Set索引缓存Key - /// 设备缓存信息 - /// 待缓存数据集合 - /// - Task BatchInsertDataAsync2( string redisSetIndexCacheKey, string redisDeviceInfoHashCacheKey, Dictionary> items) where T : DeviceCacheBasicModel; @@ -172,45 +155,24 @@ namespace JiShe.CollectBus.Application.Contracts bool descending = true) where T : DeviceCacheBasicModel; - - ///// - ///// 游标分页查询 - ///// - ///// 排序索引ZSET缓存Key - ///// 分页数量 - ///// 开始索引 - ///// 开始唯一标识 - ///// 排序方式 - ///// - //Task<(List Members, bool HasNext)> GetPagedMembers( - // string redisZSetScoresIndexCacheKey, - // int pageSize, - // decimal? startScore, - // string excludeMember, - // bool descending); - - ///// - ///// 批量获取指定分页的数据 - ///// - ///// - ///// Hash表缓存key - ///// Hash表字段集合 - ///// - //Task> BatchGetData( - // string redisHashCacheKey, - // IEnumerable members) - // where T : DeviceCacheBasicModel; - - ///// - ///// 获取下一页游标 - ///// - ///// 排序索引ZSET缓存Key - ///// 最后一个唯一标识 - ///// 排序方式 - ///// - //Task GetNextScore( - // string redisZSetScoresIndexCacheKey, - // string lastMember, - // bool descending); + /// + /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 + /// + /// + /// ZSET索引缓存Key + /// 主数据存储Hash缓存Key + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + Task> GetAllPagedData2( + string redisCacheDeviceGroupSetIndexKey, + string redisCacheDeviceInfoHashKey, + int pageSize = 1000, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel; } } diff --git a/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs index 66cbf2f..b82feab 100644 --- a/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IotSystems.Ammeters; +using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using Volo.Abp.Application.Services; @@ -34,7 +35,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// 采集端Code /// - Task> GetAmmeterInfoList(string gatherCode = ""); + Task> GetAmmeterInfoList(string gatherCode = ""); /// /// 初始化电表缓存数据 @@ -82,7 +83,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - Task> AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); + Task> AmmeterScheduledAutomaticVerificationTime(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps); /// /// 日冻结抄读 @@ -92,7 +93,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - Task> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); + Task> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps); /// /// 月冻结数据抄读 @@ -102,7 +103,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - Task> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); + Task> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps); #endregion @@ -112,7 +113,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// 采集端Code /// - Task> GetWatermeterInfoList(string gatherCode = ""); + Task> GetWatermeterInfoList(string gatherCode = ""); /// /// 初始化水表缓存数据,只获取任务数据下发,不构建任务 @@ -139,7 +140,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - Task> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); + Task> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps); /// /// 自动获取远程通信模块(SIM)版本信息 @@ -149,7 +150,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - Task> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps); + Task> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps); #endregion diff --git a/services/JiShe.CollectBus.Application/CollectBusAppService.cs b/services/JiShe.CollectBus.Application/CollectBusAppService.cs index 7c66b95..bd4a232 100644 --- a/services/JiShe.CollectBus.Application/CollectBusAppService.cs +++ b/services/JiShe.CollectBus.Application/CollectBusAppService.cs @@ -25,200 +25,5 @@ public abstract class CollectBusAppService : ApplicationService { LocalizationResource = typeof(CollectBusResource); ObjectMapperContext = typeof(CollectBusApplicationModule); - } - - /// - /// Lua脚本批量获取缓存的表计信息 - /// - /// 表信息数据对象 - /// 采集频率对应的缓存Key集合 - /// 系统类型 - /// 服务器标识 - /// 采集频率,1分钟、5分钟、15分钟 - /// 表计类型 - /// - protected async Task>> GetMeterRedisCacheDictionaryData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class - { - if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) - { - throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101"); - } - - var meterInfos = new Dictionary>(); - var luaScript = @" - local results = {} - for i, key in ipairs(KEYS) do - local data = redis.call('HGETALL', key) - results[i] = {key, data} - end - return results"; - - // 分页参数:每页处理10000个键 - int pageSize = 10000; - int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize); - - for (int page = 0; page < totalPages; page++) - { - // 分页获取当前批次的键 - var batchKeys = redisKeys - .Skip(page * pageSize) - .Take(pageSize) - .ToArray(); - - // 执行Lua脚本获取当前批次数据 - var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys); - if (merterResult == null) - { - throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,第 {page + 1} 页数据未返回,-102"); - } - - // 解析当前批次的结果 - if (merterResult is object[] arr) - { - foreach (object[] item in arr) - { - string key = (string)item[0]; - object[] fieldsAndValues = (object[])item[1]; - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, systemType, serverTagName, meterType, timeDensity)}"; - string focusAddress = key.Replace(redisCacheKey, ""); - - var meterHashs = new Dictionary(); - for (int i = 0; i < fieldsAndValues.Length; i += 2) - { - string meterId = (string)fieldsAndValues[i]; - string meterStr = (string)fieldsAndValues[i + 1]; - - T meterInfo = default!; - if (!string.IsNullOrWhiteSpace(meterStr)) - { - meterInfo = meterStr.Deserialize()!; - } - if (meterInfo != null) - { - meterHashs[meterId] = meterInfo; - } - else - { - throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 缓存表计数据异常,集中器 {key} 的表计 {meterId} 解析失败,-103"); - } - } - - // 合并到总结果,若存在重复key则覆盖 - if (meterInfos.ContainsKey(focusAddress)) - { - foreach (var kvp in meterHashs) - { - meterInfos[focusAddress][kvp.Key] = kvp.Value; - } - } - else - { - meterInfos[focusAddress] = meterHashs; - } - } - } - else - { - throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 第 {page + 1} 页数据解析失败,返回类型不符,-104"); - } - } - - return meterInfos; - } - - /// - /// Lua脚本批量获取缓存的表计信息 - /// - /// 表信息数据对象 - /// 采集频率对应的缓存Key集合 - /// 系统类型 - /// 服务器标识 - /// 采集频率,1分钟、5分钟、15分钟 - /// 表计类型 - /// - protected async Task> GetMeterRedisCacheListData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class - { - if (redisKeys == null || redisKeys.Length <= 0 || - string.IsNullOrWhiteSpace(systemType) || - string.IsNullOrWhiteSpace(serverTagName) || - string.IsNullOrWhiteSpace(timeDensity)) - { - throw new Exception($"{nameof(GetMeterRedisCacheListData)} 参数异常,-101"); - } - - var meterInfos = new List(); - var luaScript = @" - local results = {} - for i, key in ipairs(KEYS) do - local data = redis.call('HGETALL', key) - results[i] = {key, data} - end - return results"; - - // 分页参数:每页10000个键 - int pageSize = 10000; - int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize); - - for (int page = 0; page < totalPages; page++) - { - // 分页获取当前批次键 - var batchKeys = redisKeys - .Skip(page * pageSize) - .Take(pageSize) - .ToArray(); - - // 执行Lua脚本获取当前页数据 - var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys); - if (merterResult == null) - { - throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据未返回,-102"); - } - - // 解析当前页结果 - if (merterResult is object[] arr) - { - foreach (object[] item in arr) - { - string key = (string)item[0]; - object[] fieldsAndValues = (object[])item[1]; - var redisCacheKey = string.Format( - RedisConst.CacheMeterInfoHashKey, - systemType, - serverTagName, - meterType, - timeDensity - ); - string focusAddress = key.Replace(redisCacheKey, ""); - - for (int i = 0; i < fieldsAndValues.Length; i += 2) - { - string meterId = (string)fieldsAndValues[i]; - string meterStr = (string)fieldsAndValues[i + 1]; - - T meterInfo = default!; - if (!string.IsNullOrWhiteSpace(meterStr)) - { - meterInfo = meterStr.Deserialize()!; - } - if (meterInfo != null) - { - meterInfos.Add(meterInfo); - } - else - { - throw new Exception( - $"{nameof(GetMeterRedisCacheListData)} 表计 {meterId} 解析失败(页 {page + 1}),-103" - ); - } - } - } - } - else - { - throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据格式错误,-104"); - } - } - - return meterInfos; } } diff --git a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 1feb894..69c986a 100644 --- a/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/services/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -94,6 +94,6 @@ public class CollectBusApplicationModule : AbpModule //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); - _= dbContext.InitAmmeterCacheData(); + await dbContext.InitAmmeterCacheData(); } } \ No newline at end of file diff --git a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs index 5aec2e1..d73a174 100644 --- a/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs +++ b/services/JiShe.CollectBus.Application/DataChannels/DataChannelManageService.cs @@ -72,7 +72,7 @@ namespace JiShe.CollectBus.DataChannels public async Task ScheduledMeterTaskReadingAsync( ChannelReader>> telemetryPacketInfoReader) { - const int BatchSize = 100000; + const int BatchSize = 50000; const int EmptyWaitMilliseconds = 50; var timeout = TimeSpan.FromSeconds(5); var timer = Stopwatch.StartNew(); @@ -89,7 +89,7 @@ namespace JiShe.CollectBus.DataChannels { if (timeoutMilliseconds > 0) { - _logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 通道处理数据耗时{timeoutMilliseconds}毫秒"); + _logger.LogError($"{nameof(ScheduledMeterTaskReadingAsync)} 任务数据通道处理数据耗时{timeoutMilliseconds}毫秒"); } timeoutMilliseconds = 0; //无消息时短等待50毫秒 diff --git a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs index d648c89..cc64378 100644 --- a/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs +++ b/services/JiShe.CollectBus.Application/RedisDataCache/RedisDataCacheService.cs @@ -94,73 +94,11 @@ namespace JiShe.CollectBus.RedisDataCache /// 批量添加数据 /// /// - /// 主数据存储Hash缓存Key /// Set索引缓存Key - /// ZSET索引缓存Key + /// hash缓存Key /// 待缓存数据集合 /// public async Task BatchInsertDataAsync( - string redisHashCacheKey, - string redisSetIndexCacheKey, - string redisZSetScoresIndexCacheKey, - string redisDeviceInfoHashCacheKey, - IEnumerable items) where T : DeviceCacheBasicModel - { - if (items == null - || items.Count() <= 0 - || string.IsNullOrWhiteSpace(redisHashCacheKey) - || string.IsNullOrWhiteSpace(redisSetIndexCacheKey) - || string.IsNullOrWhiteSpace(redisZSetScoresIndexCacheKey) - || string.IsNullOrWhiteSpace(redisDeviceInfoHashCacheKey) - ) - { - _logger.LogError($"{nameof(BatchInsertDataAsync)} 参数异常,-101"); - return; - } - - const int BATCH_SIZE = 1000; // 每批1000条 - var semaphore = new SemaphoreSlim(Environment.ProcessorCount * 2); - - foreach (var batch in items.Batch(BATCH_SIZE)) - { - await semaphore.WaitAsync(); - - _ = Task.Run(() => - { - using (var pipe = Instance.StartPipe()) - { - foreach (var item in batch) - { - // 主数据存储Hash - pipe.HSet(redisHashCacheKey, item.MemberId, item.Serialize()); - - // Set索引缓存 - pipe.SAdd(redisSetIndexCacheKey, $"{item.TimeDensity.ToString().PadLeft(2, '0')}:{item.FocusAddress}"); - - // ZSET索引缓存Key - pipe.ZAdd(redisZSetScoresIndexCacheKey, item.ScoreValue, item.MemberId); - - //设备信息缓存 - pipe.HSet(redisDeviceInfoHashCacheKey, item.FocusAddress, item.Serialize()); - - } - pipe.EndPipe(); - } - semaphore.Release(); - }); - } - - await Task.CompletedTask; - } - - /// - /// 批量添加数据 - /// - /// - /// Set索引缓存Key - /// 待缓存数据集合 - /// - public async Task BatchInsertDataAsync2( string redisSetIndexCacheKey, string redisDeviceInfoHashCacheKey, Dictionary> items) where T : DeviceCacheBasicModel @@ -697,7 +635,167 @@ namespace JiShe.CollectBus.RedisDataCache PageSize = pageSize, }; } - + + + /// + /// 通过ZSET索引获取数据,支持10万级别数据处理,控制在13秒以内。 + /// + /// + /// ZSET索引缓存Key + /// 主数据存储Hash缓存Key + /// 分页尺寸 + /// 最后一个索引 + /// 最后一个唯一标识 + /// 排序方式 + /// + public async Task> GetAllPagedData2( + string redisCacheDeviceGroupSetIndexKey, + string redisCacheDeviceInfoHashKey, + int pageSize = 1000, + decimal? lastScore = null, + string lastMember = null, + bool descending = true) + where T : DeviceCacheBasicModel + { + // 参数校验增强 + if (string.IsNullOrWhiteSpace(redisCacheDeviceInfoHashKey) || + string.IsNullOrWhiteSpace(redisCacheDeviceGroupSetIndexKey)) + { + _logger.LogError($"{nameof(GetAllPagedData)} 参数异常,-101"); + return new BusCacheGlobalPagedResult { Items = new List() }; + } + + pageSize = Math.Clamp(pageSize, 1, 10000); + + var luaScript = @" + local command = ARGV[1] + local range_start = ARGV[2] + local range_end = ARGV[3] + local limit = tonumber(ARGV[4]) + local last_score = ARGV[5] + local last_member = ARGV[6] + + -- 获取原始数据 + local members + if command == 'ZRANGEBYSCORE' then + members = redis.call(command, KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2) + else + members = redis.call('ZREVRANGEBYSCORE', KEYS[1], range_start, range_end, 'WITHSCORES', 'LIMIT', 0, limit * 2) + end + + -- 过滤数据 + local filtered_members = {} + local count = 0 + for i = 1, #members, 2 do + local member = members[i] + local score = members[i+1] + local include = true + if last_score ~= '' and last_member ~= '' then + if command == 'ZRANGEBYSCORE' then + -- 升序:score > last_score 或 (score == last_score 且 member > last_member) + if score == last_score then + include = member > last_member + else + include = tonumber(score) > tonumber(last_score) + end + else + -- 降序:score < last_score 或 (score == last_score 且 member < last_member) + if score == last_score then + include = member < last_member + else + include = tonumber(score) < tonumber(last_score) + end + end + end + if include then + table.insert(filtered_members, member) + table.insert(filtered_members, score) + count = count + 1 + if count >= limit then + break + end + end + end + + -- 提取有效数据 + local result_members, result_scores = {}, {} + for i=1,#filtered_members,2 do + table.insert(result_members, filtered_members[i]) + table.insert(result_scores, filtered_members[i+1]) + end + + if #result_members == 0 then return {0,{},{},{}} end + + -- 获取Hash数据 + local hash_data = redis.call('HMGET', KEYS[2], unpack(result_members)) + return {#result_members, result_members, result_scores, hash_data}"; + + // 调整范围构造逻辑(移除排他符号) + string rangeStart, rangeEnd; + if (descending) + { + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "+inf"; + rangeEnd = "-inf"; + } + else + { + rangeStart = lastScore.HasValue ? lastScore.Value.ToString() : "-inf"; + rangeEnd = "+inf"; + } + + var scriptResult = (object[])await Instance.EvalAsync(luaScript, + new[] { redisCacheDeviceGroupSetIndexKey, redisCacheDeviceInfoHashKey }, + new object[] + { + descending ? "ZREVRANGEBYSCORE" : "ZRANGEBYSCORE", + rangeStart, + rangeEnd, + (pageSize + 1).ToString(), // 获取pageSize+1条以判断是否有下一页 + lastScore?.ToString() ?? "", + lastMember ?? "" + }); + + if ((long)scriptResult[0] == 0) + return new BusCacheGlobalPagedResult { Items = new List() }; + + // 处理结果集 + var members = ((object[])scriptResult[1]).Cast().ToList(); + var scores = ((object[])scriptResult[2]).Cast().Select(decimal.Parse).ToList(); + var hashData = ((object[])scriptResult[3]).Cast().ToList(); + + var validItems = members.AsParallel() + .Select((m, i) => + { + try { return BusJsonSerializer.Deserialize(hashData[i]); } + catch { return null; } + }) + .Where(x => x != null) + .ToList(); + + var hasNext = validItems.Count > pageSize; + var actualItems = hasNext ? validItems.Take(pageSize) : validItems; + + //分页锚点索引 + decimal? nextScore = null; + string nextMember = null; + if (hasNext && actualItems.Any()) + { + var lastIndex = actualItems.Count() - 1; // 使用actualItems的最后一个索引 + nextScore = scores[lastIndex]; + nextMember = members[lastIndex]; + } + + return new BusCacheGlobalPagedResult + { + Items = actualItems.ToList(), + HasNext = hasNext, + NextScore = nextScore, + NextMember = nextMember, + TotalCount = await GetTotalCount(redisCacheDeviceGroupSetIndexKey), + PageSize = pageSize, + }; + } + /// /// 游标分页查询 diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index e4c61d6..8d9ac21 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -350,22 +350,22 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS //// 初始化缓存 //DeviceGroupBalanceControl.InitializeCache(deviceList); - var timeDensity = "15"; - //获取缓存中的电表信息 - var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*"; + //var timeDensity = "15"; + ////获取缓存中的电表信息 + //var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoHashKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*"; - var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter); - List focusAddressDataLista = new List(); - foreach (var item in meterInfos) - { - focusAddressDataLista.Add(item.FocusAddress); - } + //var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + //var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter); + //List focusAddressDataLista = new List(); + //foreach (var item in meterInfos) + //{ + // focusAddressDataLista.Add(item.FocusAddress); + //} - DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); - // 打印分布统计 - DeviceGroupBalanceControl.PrintDistributionStats(); + //// 打印分布统计 + //DeviceGroupBalanceControl.PrintDistributionStats(); await Task.CompletedTask; } @@ -392,46 +392,46 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS [HttpGet] public async Task TestRedisCacheGetAllPagedData() { - var timeDensity = "15"; - string SystemType = "Energy"; - string ServerTagName = "JiSheCollectBus2"; - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + //var timeDensity = "15"; + //string SystemType = "Energy"; + //string ServerTagName = "JiSheCollectBus2"; + //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var timer1 = Stopwatch.StartNew(); - decimal? cursor = null; - string member = null; - bool hasNext; - List meterInfos = new List(); - do - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + //var timer1 = Stopwatch.StartNew(); + //decimal? cursor = null; + //string member = null; + //bool hasNext; + //List meterInfos = new List(); + //do + //{ + // var page = await _redisDataCacheService.GetAllPagedData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - meterInfos.AddRange(page.Items); - cursor = page.HasNext ? page.NextScore : null; - member = page.HasNext ? page.NextMember : null; - hasNext = page.HasNext; - } while (hasNext); + // meterInfos.AddRange(page.Items); + // cursor = page.HasNext ? page.NextScore : null; + // member = page.HasNext ? page.NextMember : null; + // hasNext = page.HasNext; + //} while (hasNext); - timer1.Stop(); - _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); + //timer1.Stop(); + //_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - List focusAddressDataLista = new List(); - foreach (var item in meterInfos) - { - focusAddressDataLista.Add(item.FocusAddress); - } + //List focusAddressDataLista = new List(); + //foreach (var item in meterInfos) + //{ + // focusAddressDataLista.Add(item.FocusAddress); + //} - DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); - // 打印分布统计 - DeviceGroupBalanceControl.PrintDistributionStats(); + //// 打印分布统计 + //DeviceGroupBalanceControl.PrintDistributionStats(); await Task.CompletedTask; } @@ -541,22 +541,22 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS [HttpGet] public async Task TestRedisCacheGetData(string scores) { - var timeDensity = "15"; - string SystemType = "Energy"; - string ServerTagName = "JiSheCollectBus5"; - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + //var timeDensity = "15"; + //string SystemType = "Energy"; + //string ServerTagName = "JiSheCollectBus5"; + //var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + //var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; + //var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var page = await _redisDataCacheService.GetSingleData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - "973219481:17", - pageSize: 1000, - lastScore: 100, - lastMember: "memberId", - descending: true - ); + //var page = await _redisDataCacheService.GetSingleData( + // redisCacheMeterInfoHashKeyTemp, + // redisCacheMeterInfoZSetScoresIndexKeyTemp, + // "973219481:17", + // pageSize: 1000, + // lastScore: 100, + // lastMember: "memberId", + // descending: true + // ); await Task.CompletedTask; } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 8e02627..2991fe8 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -15,11 +15,12 @@ using JiShe.CollectBus.IoTDB.Model; using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.IoTDB.Provider; using JiShe.CollectBus.IotSystems.Ammeters; +using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MeterReadingRecords; -using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Models; +using Mapster; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; @@ -140,7 +141,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //_logger.LogInformation($"{nameof(AmmeterScheduledAutomaticVerificationTime)} 电表自动校时,非自动校时时间"); //return; - _ = CreateMeterPublishTask( + _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTime, meterType: MeterTypeEnum.Ammeter, @@ -158,7 +159,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTerminalVersionTime, StringComparison.CurrentCultureIgnoreCase))//集中器版本号读取 { - _ = CreateMeterPublishTask( + _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTime, meterType: MeterTypeEnum.Ammeter, @@ -175,7 +176,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//SIM卡读取 { - _ = CreateMeterPublishTask( + _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTime, meterType: MeterTypeEnum.Ammeter, @@ -192,7 +193,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticTelematicsModuleTime, StringComparison.CurrentCultureIgnoreCase))//月冻结 { - _ = CreateMeterPublishTask( + _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTime, meterType: MeterTypeEnum.Ammeter, @@ -209,7 +210,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } else if (string.Equals(currentTimeStr, _applicationOptions.AutomaticDayFreezeTime, StringComparison.CurrentCultureIgnoreCase))//日冻结 { - _ = CreateMeterPublishTask( + _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTime, meterType: MeterTypeEnum.Ammeter, @@ -243,7 +244,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading if (meteryType == MeterTypeEnum.Ammeter.ToString()) { - _ = CreateMeterPublishTask( + _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTaskTime, meterType: MeterTypeEnum.Ammeter, @@ -261,7 +262,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading else if (meteryType == MeterTypeEnum.WaterMeter.ToString()) { - _ = CreateMeterPublishTask( + _ = CreateMeterPublishTask( timeDensity: timeDensity, nextTaskTime: currentTaskTime, meterType: MeterTypeEnum.WaterMeter, @@ -310,7 +311,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// 采集端Code /// - public virtual Task> GetAmmeterInfoList(string gatherCode = "") + public virtual Task> GetAmmeterInfoList(string gatherCode = "") { throw new NotImplementedException($"{nameof(GetAmmeterInfoList)}请根据不同系统类型进行实现"); } @@ -322,201 +323,175 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - //return; + //return; - // 创建取消令牌源 - //var cts = new CancellationTokenSource(); - - _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader ); - - //此处代码不要删除 -#if DEBUG - var timeDensity = "15"; - var serverTagName = "JiSheCollectBus2"; - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, serverTagName, MeterTypeEnum.Ammeter, timeDensity)}"; - - List meterInfos = new List(); - List focusAddressDataLista = new List(); - var timer1 = Stopwatch.StartNew(); - - var allIds = new HashSet(); - decimal? score = null; - string member = null; - - while (true) + try { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: score, - lastMember: member); + // 创建取消令牌源 + //var cts = new CancellationTokenSource(); - meterInfos.AddRange(page.Items); - focusAddressDataLista.AddRange(page.Items.Select(d => $"{d.MeterId}")); - foreach (var item in page.Items) + _ = _dataChannelManage.ScheduledMeterTaskReadingAsync(DataChannelManage.TaskDataChannel.Reader); + + //此处代码不要删除 +#if DEBUG + var redisCacheDeviceInfoHashKeyTemp = $"CollectBus:Energy:JiSheCollectBus2:DeviceInfo"; + + var timer1 = Stopwatch.StartNew(); + Dictionary> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll>(redisCacheDeviceInfoHashKeyTemp); + List meterInfos = new List(); + List focusAddressDataLista = new List(); + foreach (var item in keyValuePairsTemps) { - if (!allIds.Add(item.MemberId)) + foreach (var subItem in item.Value) { - _logger.LogError($"{item.MemberId}Duplicate data found!"); + if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15) + { + meterInfos.Add(subItem); + focusAddressDataLista.Add(subItem.MeterId.ToString()); + } } } - if (!page.HasNext) break; - score = page.NextScore; - member = page.NextMember; - } - timer1.Stop(); - _logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); - //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); - //return; + timer1.Stop(); + _logger.LogError($"电表初始化读取数据总共花费时间{timer1.ElapsedMilliseconds}毫秒"); + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); + return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); -#endif - if (meterInfos == null || meterInfos.Count <= 0) - { - throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); - } - - //获取采集项类型数据 - var gatherItemInfos = await GetGatherItemByDataTypes(); - if (gatherItemInfos == null || gatherItemInfos.Count <= 0) - { - throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空"); - } - var timer = Stopwatch.StartNew(); - - List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。 - - //根据采集频率分组,获得采集频率分组 - var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); - var currentTaskTime = DateTime.Now; - if (_applicationOptions.FirstCollectionTime.HasValue == false) - { - _applicationOptions.FirstCollectionTime = currentTaskTime; - } - //先处理采集频率任务缓存 - foreach (var item in meterInfoGroupByTimeDensity) - { - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() +#endif + if (meterInfos == null || meterInfos.Count <= 0) { - LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key), - TimeDensity = item.Key, - }; - nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key); - //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 + _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); + return; + } - var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key); - await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); - } - string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}"; - Dictionary> keyValuePairs = new Dictionary>(); + _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,读取数据成功"); - foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) - { - var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; - var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; - var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; - List ammeterInfos = new List(); - //将表计信息根据集中器分组,获得集中器号 - var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); - foreach (var item in meterInfoGroup) + //获取采集项类型数据 + var gatherItemInfos = await GetGatherItemByDataTypes(); + if (gatherItemInfos == null || gatherItemInfos.Count <= 0) { - if (string.IsNullOrWhiteSpace(item.Key))//集中器号为空,跳过 - { - continue; - } + _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空"); + return; + } + var timer = Stopwatch.StartNew(); - foreach (var ammeter in item) - { - deviceIds.Add(ammeter.MeterId.ToString()); + List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。 - //处理ItemCode - if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes)) + //根据采集频率分组,获得采集频率分组 + var meterInfoGroupByTimeDensity = meterInfos.Select(d => d.TimeDensity).GroupBy(d => d); + var currentTaskTime = DateTime.Now; + if (_applicationOptions.FirstCollectionTime.HasValue == false) + { + _applicationOptions.FirstCollectionTime = currentTaskTime; + } + //先处理采集频率任务缓存 + foreach (var item in meterInfoGroupByTimeDensity) + { + TasksToBeIssueModel nextTask = new TasksToBeIssueModel() + { + LastTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key), + TimeDensity = item.Key, + }; + nextTask.NextTaskTime = nextTask.LastTaskTime.CalculateNextCollectionTime(item.Key); + //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 + + var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, item.Key); + await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); + } + + //设备hash缓存key + string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}"; + + //设备分组集合key + string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}"; + + Dictionary> keyValuePairs = new Dictionary>(); + + //处理设备缓存信息 + foreach (var ammeter in meterInfos) + { + deviceIds.Add(ammeter.MeterId.ToString()); + + //处理ItemCode + if (string.IsNullOrWhiteSpace(ammeter.ItemCodes) && !string.IsNullOrWhiteSpace(ammeter.DataTypes)) + { + var itemArr = ammeter.DataTypes.Split(',').ToList(); + + #region 拼接采集项 + List itemCodeList = new List(); + foreach (var dataType in itemArr) { - var itemArr = ammeter.DataTypes.Split(',').ToList(); - - #region 拼接采集项 - List itemCodeList = new List(); - foreach (var dataType in itemArr) + var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表 + var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType)); + if (gatherItem != null) { - var excludeItemCode = "10_98,10_94";//TODO 排除透明转发:尖峰平谷时段、跳合闸,特殊电表 - var gatherItem = gatherItemInfos.FirstOrDefault(f => f.DataType.Equals(dataType)); - if (gatherItem != null) + if (!excludeItemCode.Contains(gatherItem.ItemCode)) { - if (!excludeItemCode.Contains(gatherItem.ItemCode)) - { - itemCodeList.Add(gatherItem.ItemCode); - } + itemCodeList.Add(gatherItem.ItemCode); } + } - #region 特殊电表采集项编号处理 - if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS - { - itemCodeList.Add("10_95"); - } - if (itemArr.Exists(e => e.Equals("109")))//WAVE_109 - { - itemCodeList.Add("10_109"); - } - #endregion + #region 特殊电表采集项编号处理 + if (itemArr.Exists(e => e.Equals("95"))) //德力西DTS + { + itemCodeList.Add("10_95"); + } + if (itemArr.Exists(e => e.Equals("109")))//WAVE_109 + { + itemCodeList.Add("10_109"); } #endregion - - - - ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串 - - if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes)) - { - ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109"); - } } + #endregion - ammeterInfos.Add(ammeter); - if (!keyValuePairs.ContainsKey(ammeter.FocusAddress)) + + ammeter.ItemCodes = itemCodeList.Serialize();//转换成JSON字符串 + + if (!string.IsNullOrWhiteSpace(ammeter.ItemCodes)) { - keyValuePairs[ammeter.FocusAddress] = new List() {ammeter }; - } - else - { - keyValuePairs[ammeter.FocusAddress].Add(ammeter); + ammeter.ItemCodes = ammeter.ItemCodes.Replace("WAVE_109", "10_109"); } } + + if (!keyValuePairs.ContainsKey(ammeter.FocusAddress)) + { + keyValuePairs[ammeter.FocusAddress] = new List() { ammeter.Adapt() }; + } + else + { + keyValuePairs[ammeter.FocusAddress].Add(ammeter.Adapt()); + } } - //await _redisDataCacheService.BatchInsertDataAsync2( - // redisCacheMeterInfoSetIndexKey, - // redisDeviceInfoHashCacheKey, - // keyValuePairs); + await _redisDataCacheService.BatchInsertDataAsync( + redisCacheDeviceGroupSetIndexKey, + redisCacheDeviceInfoHashKey, + keyValuePairs); - await _redisDataCacheService.BatchInsertDataAsync( - redisCacheMeterInfoHashKey, - redisCacheMeterInfoSetIndexKey, - redisCacheMeterInfoZSetScoresIndexKey, - redisDeviceInfoHashCacheKey, - ammeterInfos); + //初始化设备组负载控制 + if (deviceIds == null || deviceIds.Count <= 0) + { + _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); + + } + else + { + DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions); + } + + timer.Stop(); + + _logger.LogWarning($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒"); } - - //初始化设备组负载控制 - if (deviceIds == null || deviceIds.Count <= 0) + catch (Exception ex) { - _logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息"); - + _logger.LogError(ex, $"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据异常"); + throw ex; } - else - { - DeviceGroupBalanceControl.InitializeCache(deviceIds, _kafkaOptions.NumPartitions); - } - - timer.Stop(); - - _logger.LogWarning($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成,耗时{timer.ElapsedMilliseconds}毫秒"); } /// @@ -636,7 +611,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - private async Task> AmmerterCreatePublishTaskAction(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) + private async Task> AmmerterCreatePublishTaskAction(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps) { var currentTime = DateTime.Now; @@ -808,7 +783,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - public virtual async Task> AmmeterScheduledAutomaticVerificationTime(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) + public virtual async Task> AmmeterScheduledAutomaticVerificationTime(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps) { var currentTime = DateTime.Now; string currentTimeStr = $"{currentTime:HH:mm:00}"; @@ -842,7 +817,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading ItemCode = itemCode, SubProtocolRequest = new SubProtocolBuildRequest() { - MeterAddress = ammeterInfo.AmmerterAddress, + MeterAddress = ammeterInfo.MeterAddress, Password = ammeterInfo.Password, ItemCode = subItemCode, } @@ -884,7 +859,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - public virtual async Task> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) + public virtual async Task> AmmeterScheduledGetAutomaticDayFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps) { var currentTime = DateTime.Now; string currentTimeStr = $"{currentTime:HH:mm:00}"; @@ -954,7 +929,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - public virtual async Task> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) + public virtual async Task> AmmeterScheduledGetAutomaticMonthFreezeData(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps) { var currentTime = DateTime.Now; string currentTimeStr = $"{currentTime:HH:mm:00}"; @@ -1093,7 +1068,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// 采集端Code /// - public virtual Task> GetWatermeterInfoList(string gatherCode = "") + public virtual Task> GetWatermeterInfoList(string gatherCode = "") { throw new NotImplementedException($"{nameof(GetWatermeterInfoList)}请根据不同系统类型进行实现"); } @@ -1121,7 +1096,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading List deviceIds = new List();//用于处理Kafka主题分区数据的分发和处理。 //根据采集频率分组,获得采集频率分组 - var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); + var meterInfoGroupByTimeDensity = meterInfos.Select(d=>d.TimeDensity).GroupBy(d => d); var currentTime = DateTime.Now; if (_applicationOptions.FirstCollectionTime.HasValue == false) { @@ -1144,40 +1119,35 @@ namespace JiShe.CollectBus.ScheduledMeterReading var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.WaterMeter, item.Key); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } - string redisDeviceInfoHashCacheKey = $"{string.Format(RedisConst.DeviceInfoHashCacheKey, SystemType, ServerTagName)}"; - foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) + + //设备hash缓存key + string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}"; + + //设备分组集合key + string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}"; + + Dictionary> keyValuePairs = new Dictionary>(); + foreach (var subItem in meterInfos) { - List watermeterInfo = new List(); + deviceIds.Add(subItem.MeterId.ToString()); - //将表计信息根据集中器分组,获得集中器号 - var meterInfoGroup = itemTimeDensity.GroupBy(x => x.FocusAddress).ToList(); - foreach (var item in meterInfoGroup) + if (!keyValuePairs.ContainsKey(subItem.FocusAddress)) { - if (string.IsNullOrWhiteSpace(item.Key)) - { - continue; - } - - - var redisCacheMeterInfoHashKey = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; - var redisCacheMeterInfoSetIndexKey = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; - var redisCacheMeterInfoZSetScoresIndexKey = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}"; - - foreach (var subItem in item) - { - deviceIds.Add(subItem.MeterId.ToString()); - - watermeterInfo.Add(subItem); - } - - await _redisDataCacheService.BatchInsertDataAsync( - redisCacheMeterInfoHashKey, - redisCacheMeterInfoSetIndexKey, - redisDeviceInfoHashCacheKey, - redisCacheMeterInfoZSetScoresIndexKey, watermeterInfo); + keyValuePairs[subItem.FocusAddress] = new List() { subItem }; + } + else + { + keyValuePairs[subItem.FocusAddress].Add(subItem); } } + + + await _redisDataCacheService.BatchInsertDataAsync( + redisCacheDeviceGroupSetIndexKey, + redisCacheDeviceInfoHashKey, + keyValuePairs); + //初始化设备组负载控制 if (deviceIds == null || deviceIds.Count <= 0) { @@ -1241,7 +1211,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 时间格式的任务批次名称 /// private async Task> WatermeterCreatePublishTaskAction(int timeDensity - , WatermeterInfo watermeter, int groupIndex, DateTime timestamps) + , DeviceInfo watermeter, int groupIndex, DateTime timestamps) { var currentTime = DateTime.Now; @@ -1270,7 +1240,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据表型号获取协议插件 - var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.Code); + var protocolPlugin = await _protocolService.GetProtocolServiceAsync(watermeter.BrandType); if (protocolPlugin == null) { //_logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 定时阀控运行时间{currentTime}没有找到对应的协议组件,-105"); @@ -1351,7 +1321,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - public virtual async Task> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) + public virtual async Task> ConcentratorScheduledAutomaticGetTerminalVersion(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps) { var currentTime = DateTime.Now; string currentTimeStr = $"{currentTime:HH:mm:00}"; @@ -1392,14 +1362,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading }); var meterReadingRecords = CreateAmmeterPacketInfo( - ammeterInfo: ammeterInfo, - timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), - builderResponse: builderResponse, - itemCode: itemCode, - subItemCode: null, - pendingCopyReadTime: currentTime, - creationTime: currentTime, - packetType: TelemetryPacketTypeEnum.TerminalVersion); + ammeterInfo: ammeterInfo, + timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), + builderResponse: builderResponse, + itemCode: itemCode, + subItemCode: null, + pendingCopyReadTime: currentTime, + creationTime: currentTime, + packetType: TelemetryPacketTypeEnum.TerminalVersion); + taskList.Add(meterReadingRecords); if (taskList == null || taskList.Count <= 0) @@ -1425,7 +1396,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 集中器所在分组 /// 采集频率对应的时间戳 /// - public virtual async Task> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, AmmeterInfo ammeterInfo, int groupIndex, DateTime timestamps) + public virtual async Task> ConcentratorScheduledAutomaticGetTelematicsModule(int timeDensity, DeviceInfo ammeterInfo, int groupIndex, DateTime timestamps) { var currentTime = DateTime.Now; string currentTimeStr = $"{currentTime:HH:mm:00}"; @@ -1459,14 +1430,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading }); var meterReadingRecords = CreateAmmeterPacketInfo( - ammeterInfo: ammeterInfo, - timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), - builderResponse: builderResponse, - itemCode: itemCode, - subItemCode: null, - pendingCopyReadTime: currentTime, - creationTime: currentTime, - packetType: TelemetryPacketTypeEnum.TelematicsModule); + ammeterInfo: ammeterInfo, + timestamps: currentTime.GetDateTimeOffset().ToUnixTimeNanoseconds(), + builderResponse: builderResponse, + itemCode: itemCode, + subItemCode: null, + pendingCopyReadTime: currentTime, + creationTime: currentTime, + packetType: TelemetryPacketTypeEnum.TelematicsModule); + taskList.Add(meterReadingRecords); if (taskList == null || taskList.Count <= 0) @@ -1517,32 +1489,34 @@ namespace JiShe.CollectBus.ScheduledMeterReading var timer = Stopwatch.StartNew(); //获取对应频率中的所有电表信息 - var redisCacheMeterInfoHashKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoHashKey, SystemType, ServerTagName, meterType, timeDensity)}"; - var redisCacheMeterInfoSetIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoSetIndexKey, SystemType, ServerTagName, meterType, timeDensity)}"; - var redisCacheMeterInfoZSetScoresIndexKeyTemp = $"{string.Format(RedisConst.CacheMeterInfoZSetScoresIndexKey, SystemType, ServerTagName, meterType, timeDensity)}"; + //设备hash缓存key + string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}"; - List meterInfos = new List(); - decimal? cursor = null; - string member = null; + //设备分组集合key + string redisCacheDeviceGroupSetIndexKey = $"{string.Format(RedisConst.CacheDeviceGroupSetIndexKey, SystemType, ServerTagName)}"; - while (true) - { - var page = await _redisDataCacheService.GetAllPagedData( - redisCacheMeterInfoHashKeyTemp, - redisCacheMeterInfoZSetScoresIndexKeyTemp, - pageSize: 1000, - lastScore: cursor, - lastMember: member); + //List meterInfos = new List(); + //decimal? cursor = null; + //string member = null; - meterInfos.AddRange(page.Items); - if (!page.HasNext) - { - break; - } + //while (true) + //{ + // var page = await _redisDataCacheService.GetAllPagedData2( + // redisCacheDeviceGroupSetIndexKey, + // redisCacheDeviceInfoHashKey, + // pageSize: 1000, + // lastScore: cursor, + // lastMember: member); - cursor = page.NextScore; - member = page.NextMember; - } + // meterInfos.AddRange(page.Items); + // if (!page.HasNext) + // { + // break; + // } + + // cursor = page.NextScore; + // member = page.NextMember; + //} //var page = await _redisDataCacheService.GetAllPagedData( // redisCacheMeterInfoHashKeyTemp, @@ -1552,16 +1526,32 @@ namespace JiShe.CollectBus.ScheduledMeterReading // lastMember: member); //meterInfos.AddRange(page.Items); - if (meterInfos == null || meterInfos.Count <= 0) - { - timer.Stop(); - _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); - return; - } + //if (meterInfos == null || meterInfos.Count <= 0) + //{ + // timer.Stop(); + // _logger.LogError($"{nameof(CreateMeterPublishTask)} {meterType}的{timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); + // return; + //} + + Dictionary> keyValuePairs = FreeRedisProvider.Instance.HGetAll>(redisCacheDeviceInfoHashKey); + timer.Stop(); _logger.LogError($"{nameof(CreateMeterPublishTask)} 构建采集待下发任务,缓存获取信息共花费{timer.ElapsedMilliseconds}毫秒"); + + List meterInfos = new List(); + foreach (var item in keyValuePairs) + { + foreach (var subItem in item.Value) + { + if (subItem.MeterType == meterType && subItem.TimeDensity == timeDensity) + { + meterInfos.Add(subItem); + } + } + } + timer.Restart(); await DeviceGroupBalanceControl.ProcessWithThrottleAsync( @@ -1652,7 +1642,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 数据创建时间戳 /// 数据包类型 /// - protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(AmmeterInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType) + protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(DeviceInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType) { try { @@ -1667,7 +1657,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading DatabaseBusiID = ammeterInfo.DatabaseBusiID, PendingCopyReadTime = pendingCopyReadTime, CreationTime = creationTime, - MeterAddress = ammeterInfo.AmmerterAddress, + MeterAddress = ammeterInfo.MeterAddress, PacketType = (int)packetType, AFN = builderResponse.AFn, Fn = builderResponse.Fn, diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index d652722..57eb3ec 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -12,6 +12,7 @@ using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDB.Context; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.Ammeters; +using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Kafka.Internal; @@ -39,7 +40,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading string systemType = string.Empty; string serverTagName = string.Empty; private readonly ILogger _logger; - private readonly IIoTDbProvider _dbProvider; private readonly IProtocolService _protocolService; public EnergySystemScheduledMeterReadingService( @@ -62,7 +62,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading { serverTagName = applicationOptions.Value.ServerTagName; systemType = applicationOptions.Value.SystemType; - _dbProvider = dbProvider; _logger = logger; _protocolService = protocolService; } @@ -97,59 +96,93 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// //[HttpGet] //[Route($"ammeter/list")] - public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890") + public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890") { - List ammeterInfos = new List(); - ammeterInfos.Add(new AmmeterInfo() + //List ammeterInfos = new List(); + //ammeterInfos.Add(new DeviceInfo() + //{ + // Baudrate = 2400, + // FocusAddress = "402440506", + // Name = "张家祠工务(三相电表)", + // FocusId = 95780, + // DatabaseBusiID = 1, + // MeteringCode = 1, + // MeterAddress = "402410040506", + // MeterId = 127035, + // TypeName = 3, + // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", + // TimeDensity = 15, + // BrandType = "DDS1980", + //}); + + //ammeterInfos.Add(new DeviceInfo() + //{ + // Baudrate = 2400, + // FocusAddress = "542400504", + // Name = "五号配(长芦二所四排)(单相电表)", + // FocusId = 69280, + // DatabaseBusiID = 1, + // MeteringCode = 2, + // MeterAddress = "542410000504", + // MeterId = 95594, + // TypeName = 1, + // DataTypes = "581,589,592,597,601", + // TimeDensity = 15, + // BrandType = "DDS1980", + //}); + + //return ammeterInfos; + + try { - Baudrate = 2400, - FocusAddress = "402440506", - Name = "张家祠工务(三相电表)", - FocusId = 95780, - DatabaseBusiID = 1, - MeteringCode = 1, - AmmerterAddress = "402410040506", - MeterId = 127035, - TypeName = 3, - DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", - TimeDensity = 15, - BrandType = "DDS1980", - }); + string sql = $@"SELECT + C.ID as MeterId, + C.Name, + C.FocusID as FocusId, + C.SingleRate, + C.MeteringCode, + C.Code AS BrandType, + C.Baudrate, + C.Password, + C.MeteringPort, + C.[Address] AS MeterAddress, + C.TypeName, + C.Protocol, + C.TripState, + C.[State], + B.[Address], + B.AreaCode, + B.AutomaticReport, + D.DataTypes, + B.TimeDensity, + A.GatherCode, + C.Special, + C.[ProjectID], + B.AbnormalState, + B.LastTime, + 1 as MeterType, + CONCAT(B.AreaCode, B.[Address]) AS FocusAddress, + (select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID + FROM TB_GatherInfo(NOLOCK) AS A + INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0 + INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100 + INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0 + WHERE 1=1 and C.Special = 0 "; + //TODO 记得移除特殊表过滤 - ammeterInfos.Add(new AmmeterInfo() - { - Baudrate = 2400, - FocusAddress = "542400504", - Name = "五号配(长芦二所四排)(单相电表)", - FocusId = 69280, - DatabaseBusiID = 1, - MeteringCode = 2, - AmmerterAddress = "542410000504", - MeterId = 95594, - TypeName = 1, - DataTypes = "581,589,592,597,601", - TimeDensity = 15, - BrandType = "DDS1980", - }); - - return ammeterInfos; - - string sql = $@"SELECT C.ID as MeterId,C.Name,C.FocusID as FocusId,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID - FROM TB_GatherInfo(NOLOCK) AS A - INNER JOIN TB_FocusInfo(NOLOCK) AS B ON A.ID = B.GatherInfoID AND B.RemoveState >= 0 AND B.State>=0 - INNER JOIN TB_AmmeterInfo(NOLOCK) AS C ON B.ID = C.FocusID AND C.State>= 0 AND C.State<100 - INNER JOIN TB_AmmeterGatherItem(NOLOCK) AS D ON C.ID = D.AmmeterID AND D.State>=0 - WHERE 1=1 and C.Special = 0 "; - //TODO 记得移除特殊表过滤 - - if (!string.IsNullOrWhiteSpace(gatherCode)) - { - sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; + if (!string.IsNullOrWhiteSpace(gatherCode)) + { + sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; + } + return await SqlProvider.Instance.Change(DbEnum.EnergyDB) + .Ado + .QueryAsync(sql); + } + catch (Exception ex) + { + throw ex; } - return await SqlProvider.Instance.Change(DbEnum.EnergyDB) - .Ado - .QueryAsync(sql); } @@ -160,7 +193,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public override async Task> GetAmmeterAutoValveControlSetting(string currentTime) { - string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId + try + { + string sql = $@"SELECT A.*,B.AmmeterID as MeterId,E.GatherCode,D.AreaCode,D.[Address],CONCAT(D.AreaCode, D.[Address]) AS FocusAddress,D.ID as FocusId FROM TB_AutoTripTask(nolock) AS A INNER JOIN TB_AutoTripAmmeter(nolock) AS B ON A.ID=B.TripTaskID INNER JOIN TB_AmmeterInfo(nolock) AS C ON B.AmmeterID=C.ID @@ -168,14 +203,20 @@ namespace JiShe.CollectBus.ScheduledMeterReading INNER JOIN TB_GatherInfo(NOLOCK) AS E ON E.ID=D.GatherInfoID WHERE A.IsForbid=0 and A.State<>-1 and E.GatherCode LIKE '%V4%' "; - if (!string.IsNullOrWhiteSpace(currentTime)) - { - sql = $@"{sql} AND A.TripTime = '{currentTime}'"; - } + if (!string.IsNullOrWhiteSpace(currentTime)) + { + sql = $@"{sql} AND A.TripTime = '{currentTime}'"; + } - return await SqlProvider.Instance.Change(DbEnum.EnergyDB) - .Ado - .QueryAsync(sql); + return await SqlProvider.Instance.Change(DbEnum.EnergyDB) + .Ado + .QueryAsync(sql); + } + catch (Exception) + { + + throw; + } } @@ -200,7 +241,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //批量获取对应的缓存电表信息 - var ammeterInfos = new List(); + var ammeterInfos = new List(); List taskList = new List(); foreach (var settingInfo in settingInfos) @@ -270,7 +311,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading ItemCode = itemCode, SubProtocolRequest = new SubProtocolBuildRequest() { - MeterAddress = ammeterInfo.AmmerterAddress, + MeterAddress = ammeterInfo.MeterAddress, Password = ammeterInfo.Password, ItemCode = subItemCode, } @@ -311,9 +352,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// //[HttpGet] //[Route($"ammeter/list")] - public override async Task> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890") + public override async Task> GetWatermeterInfoList(string gatherCode = "V4-Gather-8890") { - string sql = $@"SELECT + try + { + string sql = $@"SELECT A.ID as MeterId, A.Name, A.FocusID as FocusId, @@ -340,6 +383,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading A.[ProjectID], B.AbnormalState, B.LastTime, + 2 as MeterType, CONCAT(B.AreaCode, B.[Address]) AS FocusAddress, (select top 1 DatabaseBusiID from TB_Project where ID = b.ProjectID) AS DatabaseBusiID FROM [dbo].[TB_WatermeterInfo](NOLOCK) AS A @@ -347,13 +391,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading INNER JOIN [dbo].[TB_GatherInfo](NOLOCK) AS C ON B.GatherInfoID=C.ID WHERE A.State>=0 AND A.State<100 "; - if (!string.IsNullOrWhiteSpace(gatherCode)) - { - sql = $@"{sql} AND C.GatherCode= '{gatherCode}'"; + if (!string.IsNullOrWhiteSpace(gatherCode)) + { + sql = $@"{sql} AND C.GatherCode= '{gatherCode}'"; + } + return await SqlProvider.Instance.Change(DbEnum.EnergyDB) + .Ado + .QueryAsync(sql); + } + catch (Exception) + { + + throw; } - return await SqlProvider.Instance.Change(DbEnum.EnergyDB) - .Ado - .QueryAsync(sql); } } } \ No newline at end of file diff --git a/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceCacheInfo.cs b/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceInfo.cs similarity index 80% rename from services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceCacheInfo.cs rename to services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceInfo.cs index 26bc6aa..61fe6f5 100644 --- a/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceCacheInfo.cs +++ b/services/JiShe.CollectBus.Domain/IotSystems/Devices/DeviceInfo.cs @@ -13,9 +13,9 @@ using System.Threading.Tasks; namespace JiShe.CollectBus.IotSystems.Devices { /// - /// 设备缓存信息 + /// 设备信息 /// - public class DeviceCacheInfo : DeviceCacheBasicModel + public class DeviceInfo : DeviceCacheBasicModel { /// /// 关系映射标识,用于ZSet的Member字段和Set的Value字段,具体值可以根据不同业务场景进行定义 @@ -28,12 +28,36 @@ namespace JiShe.CollectBus.IotSystems.Devices /// [Column(IsIgnore = true)] public override long ScoreValue => Common.Helpers.CommonHelper.GetFocusScores(FocusAddress, MeteringCode); + + /// + /// 通讯方案: + /// NB-IOT常德水表、NB-IOT泽联电表、GPRS华立水表、 + /// RS-485、无线、载波 + /// + public string LinkType { get; set; } /// - /// 标记信息设备类型 + /// HaveValve: 是否带阀 (0 不带阀, 1 带阀) + /// 注意:NULL表示未设置 /// - [Column(IsIgnore = true)] - public MeterTypeEnum MeterType { get; set; } + public int? HaveValve { get; set; } + + /// + /// 设备类型: 水表\气表、流量计 + /// + public string MeterTypeName { get; set; } + + /// + /// 设备品牌; + /// (当 MeterType = 水表, 如 威铭、捷先 等) + /// (当 MeterType = 流量计, 如 西恩超声波流量计、西恩电磁流量计、涡街流量计 等) + /// + public string MeterBrand { get; set; } + + /// + /// 倍率 + /// + public decimal TimesRate { get; set; } /// /// 电表名称 @@ -74,9 +98,9 @@ namespace JiShe.CollectBus.IotSystems.Devices public int MeteringCode { get; set; } /// - /// 电表通信地址 + /// 表通信地址 /// - public string AmmerterAddress { get; set; } + public string MeterAddress { get; set; } /// /// 波特率 default(2400) diff --git a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs index be4ae4b..bb143ef 100644 --- a/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/RedisConst.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Reflection; using System.Text; using System.Threading.Tasks; @@ -31,26 +32,12 @@ namespace JiShe.CollectBus.Common.Consts /// /// 设备信息缓存数据,{0}=>系统类型,{1}=>应用服务部署标记 /// - public const string DeviceInfoHashCacheKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceInfo"; - - - public const string MeterInfo = "MeterInfo"; + public const string CacheDeviceInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceInfo"; /// - /// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 + /// 设备信息缓存Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记 /// - public const string CacheMeterInfoHashKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:{"{3}"}"; - - /// - /// 缓存表计信息索引Set缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 - /// - public const string CacheMeterInfoSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:SetIndex:{"{3}"}"; - - /// - /// 缓存表计信息排序索引ZSET缓存Key,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 - /// - public const string CacheMeterInfoZSetScoresIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:{MeterInfo}:{"{2}"}:ZSetScoresIndex:{"{3}"}"; - + public const string CacheDeviceGroupSetIndexKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:DeviceGroupIndex"; public const string TaskInfo = "TaskInfo"; /// diff --git a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs index e3f79ce..178b625 100644 --- a/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs +++ b/shared/JiShe.CollectBus.Common/Models/DeviceCacheBasicModel.cs @@ -1,4 +1,5 @@ -using System; +using JiShe.CollectBus.Common.Enums; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -45,5 +46,11 @@ namespace JiShe.CollectBus.Common.Models /// 采集时间间隔(分钟,如15) /// public int TimeDensity { get; set; } + + /// + /// 表计类型 + /// 电表= 1,水表= 2,燃气表= 3,热能表= 4,水表流量计=5,燃气表流量计=6,特殊电表=7 + /// + public MeterTypeEnum MeterType { get; set; } } } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index a58ded5..8674d67 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -6,7 +6,7 @@ @{ Layout = null; } - + diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index 4f290b5..a3ed9de 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -38,7 +38,7 @@ "Default": "mongodb://mongo_PmEeF3:lixiao1980@192.168.1.9:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", "Kafka": "192.168.1.9:29092,192.168.1.9:39092,192.168.1.9:49092", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", - "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" + "EnergyDB": "server=rm-wz9hw529i3j1e3b5fbo.sqlserver.rds.aliyuncs.com,3433;database=db_energy;uid=yjdb;pwd=Kdjdhf+9*7ad222LL;Encrypt=False;Trust Server Certificate=False" }, "Redis": { "Configuration": "192.168.1.9:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",