From 2fdf5850c8304e30b6f12cce8521883dbcde5bc6 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 14 Apr 2025 16:41:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BC=93=E5=AD=98=E7=94=B5?= =?UTF-8?q?=E8=A1=A8=E7=9A=84=E8=8E=B7=E5=8F=96=EF=BC=8C=E4=BB=A5=E5=8F=8A?= =?UTF-8?q?=E5=88=86=E7=BB=84=E5=9D=87=E8=A1=A1=E7=9A=84=E6=8E=A7=E5=88=B6?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IScheduledMeterReadingService.cs | 17 +- .../IWorkerSubscriberAppService.cs | 15 +- .../CollectBusAppService.cs | 142 ++++++++- .../CollectBusApplicationModule.cs | 2 +- .../Plugins/TcpMonitor.cs | 21 +- .../Samples/SampleAppService.cs | 43 ++- .../BasicScheduledMeterReadingService.cs | 293 +++++------------- ...nergySystemScheduledMeterReadingService.cs | 70 +++-- .../Subscribers/WorkerSubscriberAppService.cs | 61 +--- .../Helpers/DeviceGroupBalanceControl.cs | 125 ++++++-- .../AdminClient/AdminClientService.cs | 38 +++ .../AdminClient/IAdminClientService.cs | 24 ++ .../Extensions/ProtocolConstExtensions.cs | 1 - .../ProtocolConst.cs | 17 +- 14 files changed, 483 insertions(+), 386 deletions(-) diff --git a/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs index 15d60d9..0f04005 100644 --- a/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/ScheduledMeterReading/IScheduledMeterReadingService.cs @@ -77,22 +77,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading Task InitWatermeterCacheData(string gatherCode = ""); /// - /// 1分钟采集水表数据 + /// 水表数据采集 /// /// - Task WatermeterScheduledMeterOneMinuteReading(); - - /// - /// 5分钟采集水表数据,只获取任务数据下发,不构建任务 - /// - /// - Task WatermeterScheduledMeterFiveMinuteReading(); - - /// - /// 15分钟采集水表数据,只获取任务数据下发,不构建任务 - /// - /// - Task WatermeterScheduledMeterFifteenMinuteReading(); + Task WatermeterScheduledMeterAutoReading(); + #endregion diff --git a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index 64dddf1..abba774 100644 --- a/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -39,19 +39,8 @@ namespace JiShe.CollectBus.Subscribers /// 1分钟采集水表数据下行消息消费订阅 /// /// - Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); - - /// - /// 5分钟采集水表数据下行消息消费订阅 - /// - /// - Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); - - /// - /// 15分钟采集水表数据下行消息消费订阅 - /// - /// - Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage issuedEventMessage); + #endregion } } diff --git a/src/JiShe.CollectBus.Application/CollectBusAppService.cs b/src/JiShe.CollectBus.Application/CollectBusAppService.cs index c634859..467b487 100644 --- a/src/JiShe.CollectBus.Application/CollectBusAppService.cs +++ b/src/JiShe.CollectBus.Application/CollectBusAppService.cs @@ -1,9 +1,13 @@ using FreeRedis; using FreeSql; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.FreeRedisProvider; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.Localization; +using JiShe.CollectBus.Serializer; using Microsoft.AspNetCore.Mvc; +using System; using System.Collections.Generic; using System.Threading.Tasks; using Volo.Abp.Application.Services; @@ -20,5 +24,141 @@ public abstract class CollectBusAppService : ApplicationService { LocalizationResource = typeof(CollectBusResource); ObjectMapperContext = typeof(CollectBusApplicationModule); - } + } + + /// + /// Lua脚本批量获取缓存的表计信息 + /// + /// 表信息数据对象 + /// 采集频率对应的缓存Key集合 + /// 系统类型 + /// 服务器标识 + /// 采集频率,1分钟、5分钟、15分钟 + /// 表计类型 + /// + protected async Task>> GetMeterRedisCacheDictionaryData(string[] redisKeys, string systemType, string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class + { + if (redisKeys == null || redisKeys.Length <=0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) + { + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,参数异常,-101"); + } + + //通过lua脚本一次性获取所有缓存内容 + var luaScript = @" + local results = {} + for i, key in ipairs(KEYS) do + local data = redis.call('HGETALL', key) + results[i] = {key, data} + end + return results"; + var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS + if (merterResult == null) + { + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息失败,没有获取到数据,-102"); + } + + // 解析结果(结果为嵌套数组) + var meterInfos = new Dictionary>(); ; + if (merterResult is object[] arr) + { + foreach (object[] item in arr) + { + string key = (string)item[0];//集中器地址对应的Redis缓存Key + object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 + var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}"; + string focusAddress = key.Replace(redisCacheKey, "");//集中器地址 + + var meterHashs = new Dictionary(); + for (int i = 0; i < fieldsAndValues.Length; i += 2) + { + string meterld = (string)fieldsAndValues[i];//表ID + string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 + + T meterInfo = default!; + if (!string.IsNullOrWhiteSpace(meterStr)) + { + meterInfo = meterStr.Deserialize()!; + } + if (meterInfo != null) + { + meterHashs[meterld] = meterInfo; + } + else + { + throw new Exception($"{nameof(GetMeterRedisCacheDictionaryData)} 获取缓存的表计信息集中器缓存{key}数据的{meterld}处理异常,-102"); + } + } + meterInfos[focusAddress] = meterHashs; + } + } + + return meterInfos; + } + + /// + /// Lua脚本批量获取缓存的表计信息 + /// + /// 表信息数据对象 + /// 采集频率对应的缓存Key集合 + /// 系统类型 + /// 服务器标识 + /// 采集频率,1分钟、5分钟、15分钟 + /// 表计类型 + /// + protected async Task> GetMeterRedisCacheListData(string[] redisKeys,string systemType,string serverTagName, string timeDensity, MeterTypeEnum meterType) where T : class + { + if (redisKeys == null || redisKeys.Length <= 0 || string.IsNullOrWhiteSpace(systemType) || string.IsNullOrWhiteSpace(serverTagName) || string.IsNullOrWhiteSpace(timeDensity)) + { + throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息失败,参数异常,-101"); + } + + //通过lua脚本一次性获取所有缓存内容 + var luaScript = @" + local results = {} + for i, key in ipairs(KEYS) do + local data = redis.call('HGETALL', key) + results[i] = {key, data} + end + return results"; + var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS + if (merterResult == null) + { + throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息失败,没有获取到数据,-102"); + } + + // 解析结果(结果为嵌套数组) + var meterInfos = new List(); ; + if (merterResult is object[] arr) + { + foreach (object[] item in arr) + { + string key = (string)item[0];//集中器地址对应的Redis缓存Key + object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 + var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, systemType, serverTagName, meterType, timeDensity)}"; + string focusAddress = key.Replace(redisCacheKey, "");//集中器地址 + + for (int i = 0; i < fieldsAndValues.Length; i += 2) + { + string meterld = (string)fieldsAndValues[i];//表ID + string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 + + T meterInfo = default!; + if (!string.IsNullOrWhiteSpace(meterStr)) + { + meterInfo = meterStr.Deserialize()!; + } + if (meterInfo != null) + { + meterInfos.Add(meterInfo); + } + else + { + throw new Exception($"{nameof(GetMeterRedisCacheListData)} 获取缓存的表计信息集中器缓存{key}数据的{meterld}处理异常,-103"); + } + } + } + } + + return meterInfos; + } } diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 1a8b326..008e736 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -59,7 +59,7 @@ public class CollectBusApplicationModule : AbpModule //默认初始化表计信息 var dbContext = context.ServiceProvider.GetRequiredService(); dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); - dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); + //dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); //初始化主题信息 var kafkaAdminClient = context.ServiceProvider.GetRequiredService(); diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index c348a3d..ff4de0c 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; @@ -141,7 +142,15 @@ namespace JiShe.CollectBus.Plugins await client.ResetIdAsync(deviceNo); - var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo); + var deviceInfoList= await _deviceRepository.GetListAsync(a => a.Number == deviceNo); + if (deviceInfoList != null && deviceInfoList.Count > 1) + { + //todo 推送集中器编号重复预警 + _logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复"); + return; + } + + var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo); if (entity == null) { await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online)); @@ -171,7 +180,15 @@ namespace JiShe.CollectBus.Plugins string clientId = deviceNo; string oldClientId = $"{client.Id}"; - var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo); + var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == deviceNo); + if (deviceInfoList != null && deviceInfoList.Count > 1) + { + //todo 推送集中器编号重复预警 + _logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复"); + return; + } + + var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo); if (entity == null) //没有登录帧的设备,只有心跳帧 { await client.ResetIdAsync(clientId); diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index 28b6ab0..ac09237 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -17,6 +17,9 @@ using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IotSystems.AFNEntity; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using Microsoft.Extensions.DependencyInjection; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Enums; +using System.Diagnostics.Metrics; namespace JiShe.CollectBus.Samples; @@ -108,21 +111,49 @@ public class SampleAppService : CollectBusAppService, ISampleAppService [HttpGet] public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000) { - var deviceList = new List(); - for (int i = 0; i < deviceCount; i++) + //var deviceList = new List(); + //for (int i = 0; i < deviceCount; i++) + //{ + // deviceList.Add($"Device_{Guid.NewGuid()}"); + //} + + //// 初始化缓存 + //DeviceGroupBalanceControl.InitializeCache(deviceList); + + var timeDensity = "15"; + //获取缓存中的电表信息 + var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, "Energy", "JiSheCollectBus", MeterTypeEnum.Ammeter.ToString(), timeDensity)}*"; + + var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, "Energy", "JiSheCollectBus", timeDensity, MeterTypeEnum.Ammeter); + List focusAddressDataLista = new List(); + foreach (var item in meterInfos) { - deviceList.Add($"Device_{Guid.NewGuid()}"); + focusAddressDataLista.Add(item.FocusAddress); } - // 初始化缓存 - DeviceGroupBalanceControl.InitializeCache(deviceList); - + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + // 打印分布统计 DeviceGroupBalanceControl.PrintDistributionStats(); await Task.CompletedTask; } + /// + /// 测试设备分组均衡控制算法获取分组Id + /// + /// + /// + [HttpGet] + public async Task TestGetDeviceGroupBalanceControl(string deviceAddress) + { + var groupId = DeviceGroupBalanceControl.GetDeviceGroupId(deviceAddress); + Console.WriteLine(groupId); + + await Task.CompletedTask; + } + /// /// 测试单个测点数据项 diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 4373dd3..d032194 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ -using DotNetCore.CAP; +using DeviceDetectorNET.Class.Client; +using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; @@ -18,6 +19,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace JiShe.CollectBus.ScheduledMeterReading @@ -32,6 +34,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IIoTDBProvider _dbProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; + public BasicScheduledMeterReadingService( ILogger logger, ICapPublisher producerBus, @@ -79,6 +82,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task CreateToBeIssueTasks() { + + //创建指定数量的线程, + var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*"; var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey); if (taskInfos == null || taskInfos.Length <= 0) @@ -94,22 +100,24 @@ namespace JiShe.CollectBus.ScheduledMeterReading { _logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}没有缓存数据,102"); continue; - } - - //检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过 - if (!IsGennerateCmd(tasksToBeIssueModel.NextTaskTime)) - { - _logger.LogWarning($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); - continue; - } + } //item 为 CacheTasksToBeIssuedKey 对应的缓存待下发的指令生产任务数据Redis Key tempArryay[0]=>CollectBus,tempArryay[1]=>SystemTypeConst,tempArryay[2]=>TaskInfo,tempArryay[3]=>表计类别,tempArryay[4]=>采集频率 var tempArryay = item.Split(":"); string meteryType = tempArryay[3];//表计类别 int timeDensity = Convert.ToInt32(tempArryay[4]);//采集频率 + //检查任务时间节点,由于定时任务10秒钟运行一次,需要判定当前时间是否在任务时间节点内,不在则跳过 + if (!IsTaskTime(tasksToBeIssueModel.NextTaskTime, timeDensity)) + { + _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} 构建待处理的下发指令任务处理时Key=>{item}时间节点不在当前时间范围内,103"); + continue; + } + + + //获取缓存中的电表信息 - var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meteryType, timeDensity)}*"; + var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, meteryType, timeDensity)}*"; var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { @@ -117,10 +125,12 @@ namespace JiShe.CollectBus.ScheduledMeterReading return; } + var meterTypes = EnumExtensions.ToEnumDictionary(); + if (meteryType == MeterTypeEnum.Ammeter.ToString()) { // 解析结果(结果为嵌套数组) - var meterInfos = await GetMeterRedisCacheData(oneMinutekeyList, $"{timeDensity}", meteryType); + var meterInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, $"{timeDensity}", meterTypes[meteryType]); if (meterInfos == null || meterInfos.Count <= 0) { _logger.LogError($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建失败,没有获取到缓存信息,-105"); @@ -140,6 +150,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogInformation($"{nameof(CreateToBeIssueTasks)} {timeDensity}分钟采集待下发任务创建完成"); + + //根据当前的采集频率和类型,重新更新下一个任务点,把任务的创建源固定在当前逻辑,避免任务处理的逻辑异常导致任务创建失败。 tasksToBeIssueModel.NextTaskTime = tasksToBeIssueModel.NextTaskTime.AddMinutes(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); @@ -165,7 +177,26 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { +#if DEBUG + var timeDensity = "15"; + //获取缓存中的电表信息 + var redisKeyList = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}*"; + + var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); + var meterInfos = await GetMeterRedisCacheListData(oneMinutekeyList, SystemType, ServerTagName, timeDensity, MeterTypeEnum.Ammeter); + List focusAddressDataLista = new List(); + foreach (var item in meterInfos) + { + focusAddressDataLista.Add(item.FocusAddress); + } + + DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + return; +#else var meterInfos = await GetAmmeterInfoList(gatherCode); +#endif + + if (meterInfos == null || meterInfos.Count <= 0) { throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,电表数据为空"); @@ -199,7 +230,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading #if DEBUG //每次缓存时,删除缓存,避免缓存数据有不准确的问题 - await FreeRedisProvider.Instance.DelAsync(redisCacheKey); + //await FreeRedisProvider.Instance.DelAsync(redisCacheKey); #else //每次缓存时,删除缓存,避免缓存数据有不准确的问题 await FreeRedisProvider.Instance.DelAsync(redisCacheKey); @@ -299,7 +330,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); @@ -363,7 +394,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.Ammeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fiveMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); @@ -427,7 +458,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.Ammeter.ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(fifteenMinutekeyList, SystemType, ServerTagName, timeDensity.ToString(), MeterTypeEnum.Ammeter); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理时没有获取到缓存信息,-102"); @@ -713,7 +744,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading AFN = aFN, Fn = fn, ItemCode = tempItem, - TaskMark = CommonHelper.GetTaskMark((int)aFN, fn,ammeter.MeteringCode), + TaskMark = CommonHelper.GetTaskMark((int)aFN, fn, ammeter.MeteringCode), ManualOrNot = false, Pn = ammeter.MeteringCode, IssuedMessageId = GuidGenerator.Create().ToString(), @@ -799,10 +830,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading } /// - /// 1分钟采集水表数据 + /// 水表数据采集 /// /// - public virtual async Task WatermeterScheduledMeterOneMinuteReading() + public virtual async Task WatermeterScheduledMeterAutoReading() { //获取缓存中的水表信息 int timeDensity = 1; @@ -810,15 +841,15 @@ namespace JiShe.CollectBus.ScheduledMeterReading var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { - _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); + _logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); return; } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(oneMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheDictionaryData(oneMinutekeyList,SystemType,ServerTagName ,timeDensity.ToString(), MeterTypeEnum.WaterMeter); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { - _logger.LogError($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); + _logger.LogError($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); return; } @@ -836,7 +867,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); + await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); @@ -856,211 +887,44 @@ namespace JiShe.CollectBus.ScheduledMeterReading //await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); - _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); + _logger.LogInformation($"{nameof(WatermeterScheduledMeterAutoReading)} {timeDensity}分钟采集水表数据处理完成"); } - /// - /// 5分钟采集水表数据 - /// - /// - public virtual async Task WatermeterScheduledMeterFiveMinuteReading() - { - - //获取缓存中的电表信息 - int timeDensity = 5; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter); - var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); - return; - } - - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fiveMinutekeyList, timeDensity.ToString(), ((int)MeterTypeEnum.WaterMeter).ToString()); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); - return; - } - - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); - } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList); - - ////缓存下一个时间的任务 - //await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); - - - _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); - } - - /// - /// 15分钟采集水表数据 - /// - /// - public virtual async Task WatermeterScheduledMeterFifteenMinuteReading() - { - //获取缓存中的电表信息 - int timeDensity = 15; - var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter); - var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); - if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-101"); - return; - } - - //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString()); - if (meterTaskInfos == null || meterTaskInfos.Count <= 0) - { - _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); - return; - } - - List meterTaskInfosList = new List(); - - //将取出的缓存任务数据发送到Kafka消息队列中 - foreach (var focusItem in meterTaskInfos) - { - foreach (var ammerterItem in focusItem.Value) - { - var tempMsg = new ScheduledMeterReadingIssuedEventMessage() - { - MessageHexString = ammerterItem.Value.IssuedMessageHexString, - MessageId = ammerterItem.Value.IssuedMessageId, - FocusAddress = ammerterItem.Value.FocusAddress, - TimeDensity = timeDensity.ToString(), - }; - //await _producerBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName, tempMsg); - - //_ = _producerBus.Publish(tempMsg); - - - meterTaskInfosList.Add(ammerterItem.Value); - } - } - if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) - { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); - } - - ////删除任务数据 - //await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); - - ////缓存下一个时间的任务 - //await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); - - - _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); - } #endregion #region 公共处理方法 - /// - /// Lua脚本批量获取缓存的表计信息 - /// - /// 表信息数据对象 - /// 采集频率对应的缓存Key集合 - /// 采集频率,1分钟、5分钟、15分钟 - /// 表计类型 - /// - private async Task>> GetMeterRedisCacheData(string[] redisKeys, string timeDensity, string meterType) where T : class - { - //通过lua脚本一次性获取所有缓存内容 - var luaScript = @" - local results = {} - for i, key in ipairs(KEYS) do - local data = redis.call('HGETALL', key) - results[i] = {key, data} - end - return results"; - var merterResult = await FreeRedisProvider.Instance.EvalAsync(luaScript, redisKeys); //传递 KEYS - if (merterResult == null) - { - _logger.LogError($"{nameof(GetMeterRedisCacheData)} 定时任务采集表数据处理时没有获取到缓存信息,-102"); - return null; - } - - // 解析结果(结果为嵌套数组) - var meterInfos = new Dictionary>(); ; - if (merterResult is object[] arr) - { - foreach (object[] item in arr) - { - string key = (string)item[0];//集中器地址对应的Redis缓存Key - object[] fieldsAndValues = (object[])item[1];//缓存Key对应的Hash表数据集合 - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, meterType, timeDensity)}"; - string focusAddress = key.Replace(redisCacheKey, "");//集中器地址 - - var meterHashs = new Dictionary(); - for (int i = 0; i < fieldsAndValues.Length; i += 2) - { - string meterld = (string)fieldsAndValues[i];//表ID - string meterStr = (string)fieldsAndValues[i + 1];//表详情数据 - - T meterInfo = default!; - if (!string.IsNullOrWhiteSpace(meterStr)) - { - meterInfo = meterStr.Deserialize()!; - } - if (meterInfo != null) - { - meterHashs[meterld] = meterInfo; - } - else - { - _logger.LogInformation($"{nameof(GetMeterRedisCacheData)} 定时任务采集表数据处理时集中器缓存{key}数据的{meterld}处理异常"); - } - } - meterInfos[focusAddress] = meterHashs; - } - } - - return meterInfos; - } + /// - /// 指定时间对比当前时间 + /// 判断是否需要生成采集指令 /// - /// - /// + /// + /// /// - private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0) + private bool IsTaskTime(DateTime nextTaskTime, int timeDensity = 0) { - if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令 - return false; - return true; + if (DateTime.Now.AddMinutes(timeDensity) >= nextTaskTime) + { + return true; + } + + return false; } + ///// + ///// 指定时间对比当前时间 + ///// + ///// + ///// + ///// + //private bool IsGennerateCmd(DateTime lastTime, int subtrahend = 0) + //{ + // if (DateTime.Now.AddDays(subtrahend) >= lastTime)//当前时间减去一天,大于等于最后在线时间,不再生成该集中器下表生成采集指令 + // return false; + // return true; + //} + ///// ///// 缓存下一个时间的任务 ///// @@ -1091,6 +955,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*"; } + #endregion } diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 12453bc..62ede1d 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -71,37 +71,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading public override async Task> GetAmmeterInfoList(string gatherCode = "V4-Gather-8890") { - List ammeterInfos = new List(); - ammeterInfos.Add(new AmmeterInfo() - { - Baudrate = 2400, - FocusAddress = "402440506", - Name = "张家祠工务(三相电表)", - FocusID = 95780, - DatabaseBusiID = 1, - MeteringCode = 1, - AmmerterAddress = "402410040506", - ID = 127035, - TypeName = 3, - DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", - TimeDensity = 15, - }); - ammeterInfos.Add(new AmmeterInfo() - { - Baudrate = 2400, - FocusAddress = "542400504", - Name = "五号配(长芦二所四排)(单相电表)", - FocusID = 69280, - DatabaseBusiID = 1, - MeteringCode = 2, - AmmerterAddress = "542410000504", - ID = 95594, - TypeName = 1, - DataTypes = "581,589,592,597,601", - TimeDensity = 15, - }); + //List ammeterInfos = new List(); + //ammeterInfos.Add(new AmmeterInfo() + //{ + // Baudrate = 2400, + // FocusAddress = "402440506", + // Name = "张家祠工务(三相电表)", + // FocusID = 95780, + // DatabaseBusiID = 1, + // MeteringCode = 1, + // AmmerterAddress = "402410040506", + // ID = 127035, + // TypeName = 3, + // DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", + // TimeDensity = 15, + //}); + //ammeterInfos.Add(new AmmeterInfo() + //{ + // Baudrate = 2400, + // FocusAddress = "542400504", + // Name = "五号配(长芦二所四排)(单相电表)", + // FocusID = 69280, + // DatabaseBusiID = 1, + // MeteringCode = 2, + // AmmerterAddress = "542410000504", + // ID = 95594, + // TypeName = 1, + // DataTypes = "581,589,592,597,601", + // TimeDensity = 15, + //}); - return ammeterInfos; + //return ammeterInfos; string sql = $@"SELECT C.ID,C.Name,C.FocusID,C.SingleRate,C.MeteringCode,C.Code AS BrandType,C.Baudrate,C.Password,C.MeteringPort,C.[Address] AS AmmerterAddress,C.TypeName,C.Protocol,C.TripState,C.[State],B.[Address],B.AreaCode,B.AutomaticReport,D.DataTypes,B.TimeDensity,A.GatherCode,C.Special,C.[ProjectID],B.AbnormalState,B.LastTime,CONCAT(B.AreaCode, B.[Address]) AS FocusAddress,(select top 1 DatabaseBusiID from TB_Project where ID = B.ProjectID) AS DatabaseBusiID FROM TB_GatherInfo(NOLOCK) AS A @@ -111,10 +111,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading WHERE 1=1 and C.Special = 0 "; //TODO 记得移除特殊表过滤 - if (!string.IsNullOrWhiteSpace(gatherCode)) - { - sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; - } + //if (!string.IsNullOrWhiteSpace(gatherCode)) + //{ + // sql = $@"{sql} AND A.GatherCode = '{gatherCode}'"; + //} return await SqlProvider.Instance.Change(DbEnum.EnergyDB) .Ado .QueryAsync(sql); @@ -187,6 +187,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading deviceList.Add($"Device_{Guid.NewGuid()}"); } + // 初始化缓存 + DeviceGroupBalanceControl.InitializeCache(deviceList); // 打印分布统计 DeviceGroupBalanceControl.PrintDistributionStats(); diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 4428fe4..3caba20 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -147,69 +147,16 @@ namespace JiShe.CollectBus.Subscribers #endregion #region 水表消息采集 + /// - /// 1分钟采集水表数据下行消息消费订阅 - /// - /// - /// - [HttpPost] - [Route("watermeter/oneminute/issued-event")] - [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerOneMinuteIssuedEventName)] - public async Task WatermeterScheduledMeterOneMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) - { - _logger.LogInformation("1分钟采集水表数据下行消息消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("【1分钟采集水表数据下行消息消费队列开始处理】协议不存在!"); - } - else - { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); - if (device != null) - { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); - - } - } - } - - /// - /// 5分钟采集水表数据下行消息消费订阅 - /// - /// - /// - [HttpPost] - [Route("watermeter/fiveminute/issued-event")] - [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFiveMinuteIssuedEventName)] - public async Task WatermeterScheduledMeterFiveMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) - { - _logger.LogInformation("5分钟采集水表数据下行消息消费队列开始处理"); - var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); - if (protocolPlugin == null) - { - _logger.LogError("【5分钟采集水表数据下行消息消费队列开始处理】协议不存在!"); - } - else - { - var device = await _deviceRepository.FindAsync(a => a.Number == receivedMessage.FocusAddress); - if (device != null) - { - await _tcpService.SendAsync(device.ClientId, Convert.FromHexString(receivedMessage.MessageHexString)); - - } - } - } - - /// - /// 15分钟采集水表数据下行消息消费订阅 + /// 水表数据下行消息消费订阅 /// /// /// [HttpPost] [Route("watermeter/fifteenminute/issued-event")] - [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerFifteenMinuteIssuedEventName)] - public async Task WatermeterScheduledMeterFifteenMinuteReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) + [CapSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] + public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(ScheduledMeterReadingIssuedEventMessage receivedMessage) { _logger.LogInformation("15分钟采集水表数据下行消息消费队列开始处理"); var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); diff --git a/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs index c48bad3..657ff7c 100644 --- a/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs +++ b/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs @@ -1,5 +1,6 @@ using JiShe.CollectBus.FreeRedisProvider; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; @@ -12,38 +13,98 @@ namespace JiShe.CollectBus.Common.Helpers /// 设备组负载控制 /// public class DeviceGroupBalanceControl - { - /// - /// 分组集合 - /// - private static List[] _cachedGroups; + { + private static readonly object _syncRoot = new object(); + + private static volatile CacheState _currentCache; /// - /// 设备分组关系映射 + /// 使用ConcurrentDictionary保证线程安全的设备分组映射 /// - private static Dictionary _balancedMapping; - - - /// - /// 初始化缓存并强制均衡 - /// - public static void InitializeCache(List deviceList,int groupCount = 50) + private sealed class CacheState { - // 步骤1: 生成均衡映射表 - _balancedMapping = CreateBalancedMapping(deviceList, groupCount); + public readonly ConcurrentDictionary BalancedMapping; + public readonly List[] CachedGroups; - // 步骤2: 根据映射表填充分组 - _cachedGroups = new List[groupCount]; - for (int i = 0; i < groupCount; i++) + public CacheState(int groupCount) { - _cachedGroups[i] = new List(capacity: deviceList.Count / groupCount + 1); + BalancedMapping = new ConcurrentDictionary(); + CachedGroups = new List[groupCount]; + for (int i = 0; i < groupCount; i++) + { + CachedGroups[i] = new List(); + } + } + } + + /// + /// 初始化或增量更新缓存 + /// + public static void InitializeCache(List deviceList, int groupCount = 60) + { + lock (_syncRoot) + { + // 首次初始化 + if (_currentCache == null) + { + var newCache = new CacheState(groupCount); + UpdateCacheWithDevices(newCache, deviceList, groupCount); + _currentCache = newCache; + } + // 后续增量更新 + else + { + if (_currentCache.CachedGroups.Length != groupCount) + throw new ArgumentException("Group count cannot change after initial initialization"); + + var clonedCache = CloneExistingCache(); + UpdateCacheWithDevices(clonedCache, deviceList, groupCount); + _currentCache = clonedCache; + } + } + } + + /// + /// 带锁的缓存克隆(写入时复制) + /// + private static CacheState CloneExistingCache() + { + var oldCache = _currentCache; + var newCache = new CacheState(oldCache.CachedGroups.Length); + + // 复制已有映射 + foreach (var kvp in oldCache.BalancedMapping) + { + newCache.BalancedMapping.TryAdd(kvp.Key, kvp.Value); } + // 复制分组数据 + for (int i = 0; i < oldCache.CachedGroups.Length; i++) + { + newCache.CachedGroups[i].AddRange(oldCache.CachedGroups[i]); + } + + return newCache; + } + + /// + /// 更新设备到缓存 + /// + private static void UpdateCacheWithDevices(CacheState cache, List deviceList, int groupCount) + { foreach (var deviceId in deviceList) { - int groupId = _balancedMapping[deviceId]; - _cachedGroups[groupId].Add(deviceId); - } + // 原子操作:如果设备不存在则计算分组 + cache.BalancedMapping.GetOrAdd(deviceId, id => + { + int groupId = GetGroupId(id, groupCount); + lock (cache.CachedGroups[groupId]) + { + cache.CachedGroups[groupId].Add(id); + } + return groupId; + }); + } } /// @@ -51,11 +112,11 @@ namespace JiShe.CollectBus.Common.Helpers /// public static List GetGroup(string deviceId) { - if (_balancedMapping == null || _cachedGroups == null) + var cache = _currentCache; + if (cache == null) throw new InvalidOperationException("缓存未初始化"); - int groupId = _balancedMapping[deviceId]; - return _cachedGroups[groupId]; + return cache.CachedGroups[cache.BalancedMapping[deviceId]]; } /// @@ -63,10 +124,11 @@ namespace JiShe.CollectBus.Common.Helpers /// public static int GetDeviceGroupId(string deviceId) { - if (_balancedMapping == null || _cachedGroups == null) + var cache = _currentCache; + if (cache == null) throw new InvalidOperationException("缓存未初始化"); - return _balancedMapping[deviceId]; + return cache.BalancedMapping[deviceId]; } @@ -162,7 +224,14 @@ namespace JiShe.CollectBus.Common.Helpers /// public static void PrintDistributionStats() { - var stats = _cachedGroups + var cache = _currentCache; + if (cache == null) + { + Console.WriteLine("缓存未初始化"); + return; + } + + var stats = cache.CachedGroups .Select((group, idx) => new { GroupId = idx, Count = group.Count }) .OrderBy(x => x.GroupId); diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs index 217e35c..e151145 100644 --- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/AdminClientService.cs @@ -69,6 +69,30 @@ namespace JiShe.CollectBus.Kafka.AdminClient return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); } + /// + /// 判断Kafka主题是否存在 + /// + /// 主题名称 + /// 副本数量,不能高于Brokers数量 + /// + public async Task CheckTopicAsync(string topic,int numPartitions) + { + var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(5)); + if(numPartitions > metadata.Brokers.Count) + { + throw new Exception($"{nameof(CheckTopicAsync)} 主题检查时,副本数量大于了节点数量。") ; + } + + return await Task.FromResult(metadata.Topics.Exists(a => a.Topic == topic)); + } + + //// + /// 创建Kafka主题 + /// + /// 主题名称 + /// 主题分区数量 + /// 副本数量,不能高于Brokers数量 + /// public async Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor) { @@ -96,17 +120,31 @@ namespace JiShe.CollectBus.Kafka.AdminClient } } + /// + /// 删除Kafka主题 + /// + /// + /// public async Task DeleteTopicAsync(string topic) { await Instance.DeleteTopicsAsync(new[] { topic }); } + /// + /// 获取Kafka主题列表 + /// + /// public async Task> ListTopicsAsync() { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); return new List(metadata.Topics.Select(t => t.Topic)); } + /// + /// 判断Kafka主题是否存在 + /// + /// + /// public async Task TopicExistsAsync(string topic) { var metadata = Instance.GetMetadata(TimeSpan.FromSeconds(10)); diff --git a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs index c3d332d..722c485 100644 --- a/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/AdminClient/IAdminClientService.cs @@ -8,9 +8,33 @@ namespace JiShe.CollectBus.Kafka.AdminClient { public interface IAdminClientService { + /// + /// 创建Kafka主题 + /// + /// 主题名称 + /// 主题分区数量 + /// 副本数量,不能高于Brokers数量 + /// Task CreateTopicAsync(string topic, int numPartitions, short replicationFactor); + + /// + /// 删除Kafka主题 + /// + /// + /// Task DeleteTopicAsync(string topic); + + /// + /// 获取Kafka主题列表 + /// + /// Task> ListTopicsAsync(); + + /// + /// 判断Kafka主题是否存在 + /// + /// + /// Task TopicExistsAsync(string topic); } } diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs index 0812aae..fc52f31 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Extensions/ProtocolConstExtensions.cs @@ -49,7 +49,6 @@ namespace JiShe.CollectBus.Protocol.Contracts //动态上报主题,需根据协议的AFN功能码动态获取 var afnList = EnumExtensions.ToNameValueDictionary(); - //需要排除的AFN功能码 var excludeItems = new List() { 6, 7, 8,15 }; diff --git a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs index e7caa1c..d1e5739 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/ProtocolConst.cs @@ -70,22 +70,9 @@ namespace JiShe.CollectBus.Protocol.Contracts #region 水表消息主题 /// - /// 1分钟采集水表数据下行消息主题 + /// 水表数据下行消息主题,由于水表采集频率不高,所以一个主题就够 /// - public const string WatermeterSubscriberWorkerOneMinuteIssuedEventName = "issued.auto.one.watermeter.event"; - /// - /// 5分钟采集水表数据下行消息主题 - /// - public const string WatermeterSubscriberWorkerFiveMinuteIssuedEventName = "issued.auto.five.watermeter.event"; - /// - /// 15分钟采集水表数据下行消息主题 - /// - public const string WatermeterSubscriberWorkerFifteenMinuteIssuedEventName = "issued.auto.fifteen.watermeter.event"; - - /// - /// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号等 - /// - public const string WatermeterSubscriberWorkerOtherIssuedEventName = "issued.auto.other.watermeter.event"; + public const string WatermeterSubscriberWorkerAutoReadingIssuedEventName = "issued.auto.reading.watermeter.event"; /// /// 水表自动阀控