diff --git a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
index d2b2b47..2b3962d 100644
--- a/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
+++ b/src/JiShe.CollectBus.Application/Samples/SampleAppService.cs
@@ -13,6 +13,7 @@ using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using JiShe.CollectBus.IoTDBProvider.Context;
using Microsoft.Extensions.Logging;
+using JiShe.CollectBus.Common.Helpers;
namespace JiShe.CollectBus.Samples;
@@ -83,6 +84,29 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
await _iotDBProvider.InsertAsync(meter);
}
+ ///
+ /// 测试设备分组均衡控制算法
+ ///
+ ///
+ ///
+ [HttpGet]
+ public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
+ {
+ var deviceList = new List();
+ for (int i = 0; i < deviceCount; i++)
+ {
+ deviceList.Add($"Device_{Guid.NewGuid()}");
+ }
+
+ // 初始化缓存
+ DeviceGroupBalanceControl.InitializeCache(deviceList);
+
+ // 打印分布统计
+ DeviceGroupBalanceControl.PrintDistributionStats();
+
+ await Task.CompletedTask;
+ }
+
public Task GetAsync()
{
diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
index ccf7aa5..5af174a 100644
--- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
+++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs
@@ -1,39 +1,23 @@
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Net;
-using System.Threading.Tasks;
-using DotNetCore.CAP;
-using DotNetCore.CAP.Messages;
-using FreeSql;
-using FreeSql.Internal.CommonProvider;
+using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
-using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
-using JiShe.CollectBus.Common.Models;
-using JiShe.CollectBus.Enums;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDBProvider;
-using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
-using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Protocol.Contracts;
-using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
-using JiShe.CollectBus.Workers;
-using MassTransit;
-using MassTransit.Internals.GraphValidation;
-using MassTransit.Transports;
using Microsoft.Extensions.Logging;
-using Volo.Abp.Domain.Repositories;
-using static FreeSql.Internal.GlobalFilter;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading.Tasks;
namespace JiShe.CollectBus.ScheduledMeterReading
{
@@ -43,9 +27,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public abstract class BasicScheduledMeterReadingService : CollectBusAppService, IScheduledMeterReadingService
{
private readonly ILogger _logger;
- private readonly ICapPublisher _producerBus;
+ private readonly ICapPublisher _producerBus;
private readonly IIoTDBProvider _dbProvider;
- private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
+ private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
public BasicScheduledMeterReadingService(
ILogger logger,
@@ -94,7 +78,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
///
public virtual async Task CreateToBeIssueTasks()
{
- var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:TaskInfo:*";
+ var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
if (taskInfos == null || taskInfos.Length <= 0)
{
@@ -185,6 +169,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
}
+ List focusAddressDataList = new List();//用于处理Kafka主题分区数据的分发和处理。
+
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
@@ -198,7 +184,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
- var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
+ var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
@@ -258,14 +244,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
}
- //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行
+ //在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
{
TimeDensity = itemTimeDensity.Key,
NextTask = DateTime.Now.AddMinutes(1)
};
- var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key);
+ var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key);
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
@@ -279,10 +265,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
{
//获取缓存中的电表信息
- int timeDensity = 5;
+ int timeDensity = 1;
var currentTime = DateTime.Now;
- var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
+ var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
@@ -324,21 +310,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//_dbProvider.SwitchSessionPool(true);
//await _dbProvider.InsertAsync(meterTaskInfosList);
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentTime);
+ await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
+ await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
- //缓存下一个时间的任务
- TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
- {
- TimeDensity = timeDensity,
- NextTask = DateTime.Now.AddMinutes(timeDensity)
- };
-
- var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity);
- await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
_logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
@@ -354,7 +332,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int timeDensity = 5;
var currentTime = DateTime.Now;
- var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
+ var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{
@@ -384,7 +362,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
FocusAddress = ammerterItem.Value.FocusAddress,
TimeDensity = timeDensity.ToString(),
};
- _= _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
+ _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
@@ -393,21 +371,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentTime);
+ await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
//缓存下一个时间的任务
- TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
- {
- TimeDensity = timeDensity,
- NextTask = DateTime.Now.AddMinutes(timeDensity)
- };
-
- var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity);
- await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
+ await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
_logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
}
@@ -425,7 +396,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int timeDensity = 15;
var currentDateTime = DateTime.Now;
- var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
+ var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
@@ -456,7 +427,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
TimeDensity = timeDensity.ToString(),
};
- _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
+ _ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
//_ = _producerBus.Publish(tempMsg);
@@ -465,21 +436,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentDateTime);
+ await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
}
//删除任务数据
//await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
//缓存下一个时间的任务
- TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
- {
- TimeDensity = timeDensity,
- NextTask = DateTime.Now.AddMinutes(timeDensity)
- };
-
- var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity);
- await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
+ await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
stopwatch.Stop();
@@ -504,7 +468,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timeDensity = 15;
}
- if ( focusGroup == null || focusGroup.Count <= 0)
+ if (focusGroup == null || focusGroup.Count <= 0)
{
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
return;
@@ -572,7 +536,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
foreach (var focusInfo in focusGroup)
{
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
- var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
+ var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
foreach (var ammeterInfo in focusInfo.Value)
{
@@ -732,7 +696,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IssuedMessageHexString = Convert.ToHexString(dataInfos),
};
meterReadingRecords.CreateDataId(GuidGenerator.Create());
-
+
keyValuePairs.TryAdd($"{ammeter.ID}_{tempItem}", meterReadingRecords);
}
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
@@ -817,8 +781,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
public virtual async Task WatermeterScheduledMeterOneMinuteReading()
{
//获取缓存中的水表信息
- int timeDensity = 5;
- var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
+ int timeDensity = 1;
+ var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
{
@@ -858,21 +822,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
+ await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
//缓存下一个时间的任务
- TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
- {
- TimeDensity = timeDensity,
- NextTask = DateTime.Now.AddMinutes(timeDensity)
- };
-
- var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity);
- await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
+ await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
@@ -887,7 +844,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//获取缓存中的电表信息
int timeDensity = 5;
- var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
+ var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{
@@ -927,21 +884,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
+ await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
//缓存下一个时间的任务
- TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
- {
- TimeDensity = timeDensity,
- NextTask = DateTime.Now.AddMinutes(timeDensity)
- };
-
- var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity);
- await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
+ await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
@@ -955,7 +905,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
//获取缓存中的电表信息
int timeDensity = 15;
- var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
+ var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
{
@@ -964,7 +914,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
//获取下发任务缓存数据
- Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString());
+ Dictionary> meterTaskInfos = await GetMeterRedisCacheData(fifteenMinutekeyList, timeDensity.ToString(), MeterTypeEnum.WaterMeter.ToString());
if (meterTaskInfos == null || meterTaskInfos.Count <= 0)
{
_logger.LogError($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理时没有获取到缓存信息,-102");
@@ -995,21 +945,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
{
- await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
+ await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList);
}
//删除任务数据
await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
//缓存下一个时间的任务
- TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
- {
- TimeDensity = timeDensity,
- NextTask = DateTime.Now.AddMinutes(timeDensity)
- };
-
- var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity);
- await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
+ await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
@@ -1094,7 +1037,37 @@ namespace JiShe.CollectBus.ScheduledMeterReading
return true;
}
+ ///
+ /// 缓存下一个时间的任务
+ ///
+ /// 采集频率
+ /// 表类型
+ ///
+ private async Task CacheNextTaskData(int timeDensity, MeterTypeEnum meterType)
+ {
+ //缓存下一个时间的任务
+ TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
+ {
+ TimeDensity = timeDensity,
+ NextTask = DateTime.Now.AddMinutes(timeDensity)
+ };
+
+ var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, meterType, timeDensity);
+ await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
+ }
+
+
+ ///
+ /// 获取缓存表计下发指令缓存key前缀
+ ///
+ ///
+ ///
+ ///
+ private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType)
+ {
+ return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*";
+ }
#endregion
-
+
}
}
diff --git a/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
index 5cf77b1..33a0a38 100644
--- a/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
+++ b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs
@@ -759,52 +759,6 @@ namespace JiShe.CollectBus.Common.Helpers
}
return fontValue;
- }
-
- ///
- /// 固定Kafka主题分组数为50,避免动态计算
- ///
- private const int FixedGroupCount = 50;
-
- ///
- /// 根据 deviceId 获取其所属分组ID(0~49)
- ///
- public static int GetGroupId(string deviceId)
- {
- int hashCode = deviceId.GetHashCode();
- // 更安全的非负取模方式,兼容负数哈希码
- return (hashCode % FixedGroupCount + FixedGroupCount) % FixedGroupCount;
- }
-
- ///
- /// 分组优化:使用数组替代字典,预初始化分组容器
- ///
- public static List[] GroupDevices(List deviceList)
- {
- //直接初始化分组,避免动态扩容
- List[] groups = new List[FixedGroupCount];
- for (int i = 0; i < FixedGroupCount; i++)
- {
- groups[i] = new List(capacity: deviceList.Count / FixedGroupCount + 1);
- }
-
- // 单次遍历直接分配
- foreach (var deviceId in deviceList)
- {
- int groupId = GetGroupId(deviceId);
- groups[groupId].Add(deviceId);
- }
-
- return groups;
- }
-
- ///
- /// 通过 deviceId 直接定位分组
- ///
- public static List FindDeviceGroup(List[] groups, string deviceId)
- {
- int groupId = GetGroupId(deviceId);
- return groups[groupId];
- }
+ }
}
}
diff --git a/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs b/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs
new file mode 100644
index 0000000..56fb46f
--- /dev/null
+++ b/src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs
@@ -0,0 +1,163 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JiShe.CollectBus.Common.Helpers
+{
+ ///
+ /// 设备组负载控制
+ ///
+ public class DeviceGroupBalanceControl
+ {
+ ///
+ /// 设备组数量
+ ///
+ private const int GroupCount = 50;
+ private static List[] _cachedGroups;
+ private static Dictionary _balancedMapping;
+
+ ///
+ /// 初始化缓存并强制均衡
+ ///
+ public static void InitializeCache(List deviceList)
+ {
+ // 步骤1: 生成均衡映射表
+ _balancedMapping = CreateBalancedMapping(deviceList, GroupCount);
+
+ // 步骤2: 根据映射表填充分组
+ _cachedGroups = new List[GroupCount];
+ for (int i = 0; i < GroupCount; i++)
+ {
+ _cachedGroups[i] = new List(capacity: deviceList.Count / GroupCount + 1);
+ }
+
+ foreach (var deviceId in deviceList)
+ {
+ int groupId = _balancedMapping[deviceId];
+ _cachedGroups[groupId].Add(deviceId);
+ }
+ }
+
+ ///
+ /// 通过 deviceId 获取分组
+ ///
+ public static List GetGroup(string deviceId)
+ {
+ if (_balancedMapping == null || _cachedGroups == null)
+ throw new InvalidOperationException("缓存未初始化");
+
+ int groupId = _balancedMapping[deviceId];
+ return _cachedGroups[groupId];
+ }
+
+
+ ///
+ /// 创建均衡映射表
+ ///
+ /// 数据集合
+ /// 分组数量
+ /// 允许的最大偏差百分比
+ ///
+ public static Dictionary CreateBalancedMapping(List deviceList, int groupCount, int maxDeviation = 5)
+ {
+ var mapping = new Dictionary();
+ int targetPerGroup = deviceList.Count / groupCount;
+ int maxAllowed = (int)(targetPerGroup * (1 + maxDeviation / 100.0));
+
+ // 初始化分组计数器
+ int[] groupCounters = new int[groupCount];
+
+ // 随机数生成器用于平衡分配
+ Random rand = new Random();
+
+ foreach (var deviceId in deviceList)
+ {
+ int preferredGroup = GetGroupId(deviceId, groupCount);
+
+ // 如果首选分组未满,直接分配
+ if (groupCounters[preferredGroup] < maxAllowed)
+ {
+ mapping[deviceId] = preferredGroup;
+ groupCounters[preferredGroup]++;
+ }
+ else
+ {
+ // 寻找当前最空闲的分组
+ int fallbackGroup = Array.IndexOf(groupCounters, groupCounters.Min());
+ mapping[deviceId] = fallbackGroup;
+ groupCounters[fallbackGroup]++;
+ }
+ }
+
+ return mapping;
+ }
+
+ ///
+ /// 分析分组分布
+ ///
+ ///
+ ///
+ ///
+ public static Dictionary AnalyzeDistribution(List deviceList, int groupCount)
+ {
+ Dictionary distribution = new Dictionary();
+ foreach (var deviceId in deviceList)
+ {
+ int groupId = GetGroupId(deviceId, groupCount);
+ distribution[groupId] = distribution.TryGetValue(groupId, out var count) ? count + 1 : 1;
+ }
+ return distribution;
+ }
+
+ ///
+ /// 获取设备ID对应的分组ID
+ ///
+ ///
+ ///
+ ///
+ public static int GetGroupId(string deviceId, int groupCount)
+ {
+ int hash = Fnv1aHash(deviceId);
+ // 双重取模确保分布均匀
+ return (hash % groupCount + groupCount) % groupCount;
+ }
+
+ ///
+ /// FNV-1a哈希算法
+ ///
+ ///
+ ///
+ public static int Fnv1aHash(string input)
+ {
+ const uint fnvPrime = 16777619;
+ const uint fnvOffsetBasis = 2166136261;
+
+ uint hash = fnvOffsetBasis;
+ foreach (char c in input)
+ {
+ hash ^= (byte)c;
+ hash *= fnvPrime;
+ }
+ return (int)hash;
+ }
+
+ ///
+ /// 打印分组统计数据
+ ///
+ public static void PrintDistributionStats()
+ {
+ var stats = _cachedGroups
+ .Select((group, idx) => new { GroupId = idx, Count = group.Count })
+ .OrderBy(x => x.GroupId);
+
+ Console.WriteLine("分组数据量统计:");
+ foreach (var stat in stats)
+ {
+ Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据");
+ }
+ }
+
+ }
+}