using Confluent.Kafka; using FreeRedis; using FreeSql; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Localization; using JiShe.CollectBus.Serializer; using Microsoft.AspNetCore.Mvc; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Volo.Abp.Application.Services; namespace JiShe.CollectBus; [ApiExplorerSettings(GroupName = CollectBusDomainSharedConsts.Business)] public abstract class CollectBusAppService : ApplicationService { public IFreeSqlProvider SqlProvider => LazyServiceProvider.LazyGetRequiredService(); protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetService()!; protected CollectBusAppService() { LocalizationResource = typeof(CollectBusResource); ObjectMapperContext = typeof(CollectBusApplicationModule); } /// /// Lua脚本批量获取缓存的表计信息 /// /// 表信息数据对象 /// 采集频率对应的缓存Key集合 /// 系统类型 /// 服务器标识 /// 采集频率,1分钟、5分钟、15分钟 /// 表计类型 /// protected async Task>> GetMeterRedisCacheDictionaryData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class { if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) { throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101"); } var meterInfos = new Dictionary>(); var luaScript = @" local results = {} for i, key in ipairs(KEYS) do local data = redis.call('HGETALL', key) results[i] = {key, data} end return results"; // 分页参数:每页处理10000个键 int pageSize = 10000; int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize); for (int page = 0; page < totalPages; page++) { // 分页获取当前批次的键 var batchKeys = redisKeys .Skip(page * pageSize) .Take(pageSize) .ToArray(); // 执行Lua脚本获取当前批次数据 var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys); if (merterResult == null) { throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,第 {page + 1} 页数据未返回,-102"); } // 解析当前批次的结果 if (merterResult is object[] arr) { foreach (object[] item in arr) { string key = (string)item[0]; object[] fieldsAndValues = (object[])item[1]; var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}"; string focusAddress = key.Replace(redisCacheKey, ""); var meterHashs = new Dictionary(); for (int i = 0; i < fieldsAndValues.Length; i += 2) { string meterId = (string)fieldsAndValues[i]; string meterStr = (string)fieldsAndValues[i + 1]; T meterInfo = default!; if (!string.IsNullOrWhiteSpace(meterStr)) { meterInfo = meterStr.Deserialize()!; } if (meterInfo != null) { meterHashs[meterId] = meterInfo; } else { throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 缓存表计数据异常,集中器 {key} 的表计 {meterId} 解析失败,-103"); } } // 合并到总结果,若存在重复key则覆盖 if (meterInfos.ContainsKey(focusAddress)) { foreach (var kvp in meterHashs) { meterInfos[focusAddress][kvp.Key] = kvp.Value; } } else { meterInfos[focusAddress] = meterHashs; } } } else { throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 第 {page + 1} 页数据解析失败,返回类型不符,-104"); } } return meterInfos; } /// /// Lua脚本批量获取缓存的表计信息 /// /// 表信息数据对象 /// 采集频率对应的缓存Key集合 /// 系统类型 /// 服务器标识 /// 采集频率,1分钟、5分钟、15分钟 /// 表计类型 /// protected async Task> GetMeterRedisCacheListData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class { if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) { throw new Exception($"{nameof(GetMeterRedisCacheListData)} 参数异常,-101"); } var meterInfos = new List(); var luaScript = @" local results = {} for i, key in ipairs(KEYS) do local data = redis.call('HGETALL', key) results[i] = {key, data} end return results"; // 分页参数:每页10000个键 int pageSize = 10000; int totalPages = (int)Math.Ceiling(redisKeys.Length / (double)pageSize); for (int page = 0; page < totalPages; page++) { // 分页获取当前批次键 var batchKeys = redisKeys .Skip(page * pageSize) .Take(pageSize) .ToArray(); // 执行Lua脚本获取当前页数据 var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, batchKeys); if (merterResult == null) { throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据未返回,-102"); } // 解析当前页结果 if (merterResult is object[] arr) { foreach (object[] item in arr) { string key = (string)item[0]; object[] fieldsAndValues = (object[])item[1]; var redisCacheKey = string.Format( RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity ); string focusAddress = key.Replace(redisCacheKey, ""); for (int i = 0; i < fieldsAndValues.Length; i += 2) { string meterId = (string)fieldsAndValues[i]; string meterStr = (string)fieldsAndValues[i + 1]; T meterInfo = default!; if (!string.IsNullOrWhiteSpace(meterStr)) { meterInfo = meterStr.Deserialize()!; } if (meterInfo != null) { meterInfos.Add(meterInfo); } else { throw new Exception( $"{nameof(GetMeterRedisCacheListData)} 表计 {meterId} 解析失败(页 {page + 1}),-103" ); } } } } else { throw new Exception($"{nameof(GetMeterRedisCacheListData)} 第 {page + 1} 页数据格式错误,-104"); } } return meterInfos; } }