From 7e0aa8169bd47838555335f6b7e4f385bac0b841 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E7=9B=8A?= Date: Wed, 9 Apr 2025 23:11:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E8=AE=BE=E5=A4=87=E7=BB=84?= =?UTF-8?q?=E8=B4=9F=E8=BD=BD=E6=8E=A7=E5=88=B6=EF=BC=8C=E7=94=A8=E4=BA=8E?= =?UTF-8?q?Kafka=E4=B8=BB=E9=A2=98=E5=88=86=E5=8C=BA=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E6=95=B0=E9=87=8F=E7=9A=84=E5=9D=87=E8=A1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Samples/SampleAppService.cs | 24 +++ .../BasicScheduledMeterReadingService.cs | 169 ++++++++---------- .../Helpers/CommonHelper.cs | 48 +---- .../Helpers/DeviceGroupBalanceControl.cs | 163 +++++++++++++++++ 4 files changed, 259 insertions(+), 145 deletions(-) create mode 100644 src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs index d2b2b47..2b3962d 100644 --- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -13,6 +13,7 @@ using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Options; using JiShe.CollectBus.IoTDBProvider.Context; using Microsoft.Extensions.Logging; +using JiShe.CollectBus.Common.Helpers; namespace JiShe.CollectBus.Samples; @@ -83,6 +84,29 @@ public class SampleAppService : CollectBusAppService, ISampleAppService await _iotDBProvider.InsertAsync(meter); } + /// + /// 测试设备分组均衡控制算法 + /// + /// + /// + [HttpGet] + public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000) + { + var deviceList = new List(); + for (int i = 0; i < deviceCount; i++) + { + deviceList.Add($"Device_{Guid.NewGuid()}"); + } + + // 初始化缓存 + DeviceGroupBalanceControl.InitializeCache(deviceList); + + // 打印分布统计 + DeviceGroupBalanceControl.PrintDistributionStats(); + + await Task.CompletedTask; + } + public Task GetAsync() { diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index ccf7aa5..5af174a 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,39 +1,23 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Net; -using System.Threading.Tasks; -using DotNetCore.CAP; -using DotNetCore.CAP.Messages; -using FreeSql; -using FreeSql.Internal.CommonProvider; +using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; -using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; -using JiShe.CollectBus.Common.Models; -using JiShe.CollectBus.Enums; using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.IoTDBProvider; -using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; -using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; using JiShe.CollectBus.Protocol.Contracts; -using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository.MeterReadingRecord; -using JiShe.CollectBus.Workers; -using MassTransit; -using MassTransit.Internals.GraphValidation; -using MassTransit.Transports; using Microsoft.Extensions.Logging; -using Volo.Abp.Domain.Repositories; -using static FreeSql.Internal.GlobalFilter; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -43,9 +27,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService { private readonly ILogger _logger; - private readonly ICapPublisher _producerBus; + private readonly ICapPublisher _producerBus; private readonly IIoTDBProvider _dbProvider; - private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; + private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; public BasicScheduledMeterReadingService( ILogger logger, @@ -94,7 +78,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task CreateToBeIssueTasks() { - var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:TaskInfo:*"; + var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*"; var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey); if (taskInfos == null || taskInfos.Length <= 0) { @@ -185,6 +169,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空"); } + List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。 + //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); foreach (var itemTimeDensity in meterInfoGroupByTimeDensity) @@ -198,7 +184,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading continue; } - var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; + var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; #if DEBUG //每次缓存时,删除缓存,避免缓存数据有不准确的问题 @@ -258,14 +244,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); } - //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行 + //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务 TasksToBeIssueModel nextTask = new TasksToBeIssueModel() { TimeDensity = itemTimeDensity.Key, NextTask = DateTime.Now.AddMinutes(1) }; - var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key); + var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); } @@ -279,10 +265,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading public virtual async Task AmmeterScheduledMeterOneMinuteReading() { //获取缓存中的电表信息 - int timeDensity = 5; + int timeDensity = 1; var currentTime = DateTime.Now; - var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*"; + var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { @@ -324,21 +310,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading //_dbProvider.SwitchSessionPool(true); //await _dbProvider.InsertAsync(meterTaskInfosList); - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentTime); + await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime); } //删除任务数据 await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList); + await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter); - //缓存下一个时间的任务 - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - { - TimeDensity = timeDensity, - NextTask = DateTime.Now.AddMinutes(timeDensity) - }; - - var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity); - await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); _logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成"); @@ -354,7 +332,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading int timeDensity = 5; var currentTime = DateTime.Now; - var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*"; + var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0) { @@ -384,7 +362,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - _= _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); + _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); @@ -393,21 +371,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentTime); + await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime); } //删除任务数据 await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList); //缓存下一个时间的任务 - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - { - TimeDensity = timeDensity, - NextTask = DateTime.Now.AddMinutes(timeDensity) - }; - - var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity); - await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); + await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter); _logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成"); } @@ -425,7 +396,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading int timeDensity = 15; var currentDateTime = DateTime.Now; - var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*"; + var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) { @@ -456,7 +427,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading TimeDensity = timeDensity.ToString(), }; - _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); + _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); @@ -465,21 +436,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentDateTime); + await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime); } //删除任务数据 //await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); //缓存下一个时间的任务 - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - { - TimeDensity = timeDensity, - NextTask = DateTime.Now.AddMinutes(timeDensity) - }; - - var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity); - await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); + await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter); stopwatch.Stop(); @@ -504,7 +468,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading timeDensity = 15; } - if ( focusGroup == null || focusGroup.Count <= 0) + if (focusGroup == null || focusGroup.Count <= 0) { _logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101"); return; @@ -572,7 +536,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading foreach (var focusInfo in focusGroup) { //构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型 - var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}"; + var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}"; foreach (var ammeterInfo in focusInfo.Value) { @@ -732,7 +696,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IssuedMessageHexString = Convert.ToHexString(dataInfos), }; meterReadingRecords.CreateDataId(GuidGenerator.Create()); - + keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords); } await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs); @@ -817,8 +781,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading public virtual async Task WatermeterScheduledMeterOneMinuteReading() { //获取缓存中的水表信息 - int timeDensity = 5; - var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*"; + int timeDensity = 1; + var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter); var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0) { @@ -858,21 +822,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList); //缓存下一个时间的任务 - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - { - TimeDensity = timeDensity, - NextTask = DateTime.Now.AddMinutes(timeDensity) - }; - - var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity); - await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); + await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); _logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); @@ -887,7 +844,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading //获取缓存中的电表信息 int timeDensity = 5; - var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*"; + var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter); var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0) { @@ -927,21 +884,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList); //缓存下一个时间的任务 - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - { - TimeDensity = timeDensity, - NextTask = DateTime.Now.AddMinutes(timeDensity) - }; - - var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity); - await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); + await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); @@ -955,7 +905,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { //获取缓存中的电表信息 int timeDensity = 15; - var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*"; + var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter); var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0) { @@ -964,7 +914,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取下发任务缓存数据 - Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString()); + Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString()); if (meterTaskInfos == null || meterTaskInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102"); @@ -995,21 +945,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading } if (meterTaskInfosList != null && meterTaskInfosList.Count > 0) { - await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); + await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList); } //删除任务数据 await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList); //缓存下一个时间的任务 - TasksToBeIssueModel nextTask = new TasksToBeIssueModel() - { - TimeDensity = timeDensity, - NextTask = DateTime.Now.AddMinutes(timeDensity) - }; - - var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity); - await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); + await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter); _logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成"); @@ -1094,7 +1037,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading return true; } + /// + /// 缓存下一个时间的任务 + /// + /// 采集频率 + /// 表类型 + /// + private async Task CacheNextTaskData(int timeDensity, MeterTypeEnum meterType) + { + //缓存下一个时间的任务 + TasksToBeIssueModel nextTask = new TasksToBeIssueModel() + { + TimeDensity = timeDensity, + NextTask = DateTime.Now.AddMinutes(timeDensity) + }; + + var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, meterType, timeDensity); + await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask); + } + + + /// + /// 获取缓存表计下发指令缓存key前缀 + /// + /// + /// + /// + private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType) + { + return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*"; + } #endregion - + } } diff --git a/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 5cf77b1..33a0a38 100644 --- a/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -759,52 +759,6 @@ namespace JiShe.CollectBus.Common.Helpers } return fontValue; - } - - /// - /// 固定Kafka主题分组数为50,避免动态计算 - /// - private const int FixedGroupCount = 50; - - /// - /// 根据 deviceId 获取其所属分组ID(0~49) - /// - public static int GetGroupId(string deviceId) - { - int hashCode = deviceId.GetHashCode(); - // 更安全的非负取模方式,兼容负数哈希码 - return (hashCode % FixedGroupCount + FixedGroupCount) % FixedGroupCount; - } - - /// - /// 分组优化:使用数组替代字典,预初始化分组容器 - /// - public static List[] GroupDevices(List deviceList) - { - //直接初始化分组,避免动态扩容 - List[] groups = new List[FixedGroupCount]; - for (int i = 0; i < FixedGroupCount; i++) - { - groups[i] = new List(capacity: deviceList.Count / FixedGroupCount + 1); - } - - // 单次遍历直接分配 - foreach (var deviceId in deviceList) - { - int groupId = GetGroupId(deviceId); - groups[groupId].Add(deviceId); - } - - return groups; - } - - /// - /// 通过 deviceId 直接定位分组 - /// - public static List FindDeviceGroup(List[] groups, string deviceId) - { - int groupId = GetGroupId(deviceId); - return groups[groupId]; - } + } } } diff --git a/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs new file mode 100644 index 0000000..56fb46f --- /dev/null +++ b/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs @@ -0,0 +1,163 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Helpers +{ + /// + /// 设备组负载控制 + /// + public class DeviceGroupBalanceControl + { + /// + /// 设备组数量 + /// + private const int GroupCount = 50; + private static List[] _cachedGroups; + private static Dictionary _balancedMapping; + + /// + /// 初始化缓存并强制均衡 + /// + public static void InitializeCache(List deviceList) + { + // 步骤1: 生成均衡映射表 + _balancedMapping = CreateBalancedMapping(deviceList, GroupCount); + + // 步骤2: 根据映射表填充分组 + _cachedGroups = new List[GroupCount]; + for (int i = 0; i < GroupCount; i++) + { + _cachedGroups[i] = new List(capacity: deviceList.Count / GroupCount + 1); + } + + foreach (var deviceId in deviceList) + { + int groupId = _balancedMapping[deviceId]; + _cachedGroups[groupId].Add(deviceId); + } + } + + /// + /// 通过 deviceId 获取分组 + /// + public static List GetGroup(string deviceId) + { + if (_balancedMapping == null || _cachedGroups == null) + throw new InvalidOperationException("缓存未初始化"); + + int groupId = _balancedMapping[deviceId]; + return _cachedGroups[groupId]; + } + + + /// + /// 创建均衡映射表 + /// + /// 数据集合 + /// 分组数量 + /// 允许的最大偏差百分比 + /// + public static Dictionary CreateBalancedMapping(List deviceList, int groupCount, int maxDeviation = 5) + { + var mapping = new Dictionary(); + int targetPerGroup = deviceList.Count / groupCount; + int maxAllowed = (int)(targetPerGroup * (1 + maxDeviation / 100.0)); + + // 初始化分组计数器 + int[] groupCounters = new int[groupCount]; + + // 随机数生成器用于平衡分配 + Random rand = new Random(); + + foreach (var deviceId in deviceList) + { + int preferredGroup = GetGroupId(deviceId, groupCount); + + // 如果首选分组未满,直接分配 + if (groupCounters[preferredGroup] < maxAllowed) + { + mapping[deviceId] = preferredGroup; + groupCounters[preferredGroup]++; + } + else + { + // 寻找当前最空闲的分组 + int fallbackGroup = Array.IndexOf(groupCounters, groupCounters.Min()); + mapping[deviceId] = fallbackGroup; + groupCounters[fallbackGroup]++; + } + } + + return mapping; + } + + /// + /// 分析分组分布 + /// + /// + /// + /// + public static Dictionary AnalyzeDistribution(List deviceList, int groupCount) + { + Dictionary distribution = new Dictionary(); + foreach (var deviceId in deviceList) + { + int groupId = GetGroupId(deviceId, groupCount); + distribution[groupId] = distribution.TryGetValue(groupId, out var count) ? count + 1 : 1; + } + return distribution; + } + + /// + /// 获取设备ID对应的分组ID + /// + /// + /// + /// + public static int GetGroupId(string deviceId, int groupCount) + { + int hash = Fnv1aHash(deviceId); + // 双重取模确保分布均匀 + return (hash % groupCount + groupCount) % groupCount; + } + + /// + /// FNV-1a哈希算法 + /// + /// + /// + public static int Fnv1aHash(string input) + { + const uint fnvPrime = 16777619; + const uint fnvOffsetBasis = 2166136261; + + uint hash = fnvOffsetBasis; + foreach (char c in input) + { + hash ^= (byte)c; + hash *= fnvPrime; + } + return (int)hash; + } + + /// + /// 打印分组统计数据 + /// + public static void PrintDistributionStats() + { + var stats = _cachedGroups + .Select((group, idx) => new { GroupId = idx, Count = group.Count }) + .OrderBy(x => x.GroupId); + + Console.WriteLine("分组数据量统计:"); + foreach (var stat in stats) + { + Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据"); + } + } + + } +}