2025-04-17 20:28:50 +08:00

427 lines
15 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Common.DeviceBalanceControl
{
/// <summary>
/// 设备组负载控制
/// </summary>
public class DeviceGroupBalanceControl
{
private static readonly object _syncRoot = new object();
private static volatile CacheState _currentCache;
/// <summary>
/// 使用ConcurrentDictionary保证线程安全的设备分组映射
/// </summary>
private sealed class CacheState
{
public readonly ConcurrentDictionary<string, int> BalancedMapping;
public readonly List<string>[] CachedGroups;
public CacheState(int groupCount)
{
BalancedMapping = new ConcurrentDictionary<string, int>();
CachedGroups = new List<string>[groupCount];
for (int i = 0; i < groupCount; i++)
{
CachedGroups[i] = new List<string>();
}
}
}
/// <summary>
/// 初始化或增量更新缓存
/// </summary>
public static void InitializeCache(List<string> deviceList, int groupCount = 30)
{
if (deviceList == null || deviceList.Count <= 0)
{
throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化失败,设备数据为空");
}
if (groupCount > 60 || groupCount <= 0)
{
groupCount = 60;
}
lock (_syncRoot)
{
// 首次初始化
if (_currentCache == null)
{
var newCache = new CacheState(groupCount);
UpdateCacheWithDevices(newCache, deviceList, groupCount);
_currentCache = newCache;
}
// 后续增量更新
else
{
if (_currentCache.CachedGroups.Length != groupCount)
{
throw new ArgumentException($"{nameof(InitializeCache)} 设备分组初始化完成以后,分组数量不能更改");
}
var clonedCache = CloneExistingCache();
UpdateCacheWithDevices(clonedCache, deviceList, groupCount);
_currentCache = clonedCache;
}
}
}
/// <summary>
/// 带锁的缓存克隆(写入时复制)
/// </summary>
private static CacheState CloneExistingCache()
{
var oldCache = _currentCache;
var newCache = new CacheState(oldCache.CachedGroups.Length);
// 复制已有映射
foreach (var kvp in oldCache.BalancedMapping)
{
newCache.BalancedMapping.TryAdd(kvp.Key, kvp.Value);
}
// 复制分组数据
for (int i = 0; i < oldCache.CachedGroups.Length; i++)
{
newCache.CachedGroups[i].AddRange(oldCache.CachedGroups[i]);
}
return newCache;
}
/// <summary>
/// 更新设备到缓存
/// </summary>
private static void UpdateCacheWithDevices(CacheState cache, List<string> deviceList, int groupCount)
{
foreach (var deviceId in deviceList)
{
// 原子操作:如果设备不存在则计算分组
cache.BalancedMapping.GetOrAdd(deviceId, id =>
{
int groupId = GetGroupId(id, groupCount);
lock (cache.CachedGroups[groupId])
{
cache.CachedGroups[groupId].Add(id);
}
return groupId;
});
}
}
/// <summary>
/// 并行处理泛型数据集(支持动态线程分配)
/// </summary>
/// <typeparam name="T">已经分组的设备信息</typeparam>
/// <param name="items">部分或者全部的已经分组的设备集合</param>
/// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
/// <param name="processor">处理委托参数当前对象线程ID</param>
/// <param name="maxThreads">可选线程限制</param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public static async Task ProcessGenericListAsync<T>(
List<T> items, Func<T, string> deviceIdSelector, Action<T, int> processor, int? maxThreads = null)
{
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
// 创建分组任务队列
var groupQueues = new ConcurrentQueue<T>[cache.CachedGroups.Length];
for (int i = 0; i < groupQueues.Length; i++)
{
groupQueues[i] = new ConcurrentQueue<T>();
}
// 阶段1分发数据到分组队列
Parallel.ForEach(items, item =>
{
var deviceId = deviceIdSelector(item);
if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId))
{
groupQueues[groupId].Enqueue(item);
}
});
if ((maxThreads.HasValue && maxThreads.Value > cache.CachedGroups.Length) || maxThreads.HasValue == false)
{
maxThreads = cache.CachedGroups.Length;
}
// 阶段2并行处理队列
var options = new ParallelOptions
{
MaxDegreeOfParallelism = maxThreads.Value,
};
await Task.Run(() =>
{
Parallel.For(0, cache.CachedGroups.Length, options, async groupId =>
{
var queue = groupQueues[groupId];
while (queue.TryDequeue(out T item))
{
processor(item, groupId);
}
});
});
}
/// <summary>
/// 智能节流处理CPU友好型
/// </summary>
/// <typeparam name="T">已经分组的设备信息</typeparam>
/// <param name="items">部分或者全部的已经分组的设备集合</param>
/// <param name="deviceIdSelector">从泛型对象提取deviceId</param>
/// <param name="processor">处理委托参数当前对象分组ID</param>
/// <param name="maxConcurrency">可选最佳并发度</param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
public static async Task ProcessWithThrottleAsync<T>(
List<T> items,
Func<T, string> deviceIdSelector,
Action<T,int> processor,
int? maxConcurrency = null)
{
var cache = _currentCache ?? throw new InvalidOperationException("缓存未初始化");
// 自动计算最佳并发度
int recommendedThreads = CalculateOptimalThreadCount();
if ((maxConcurrency.HasValue && maxConcurrency.Value > cache.CachedGroups.Length) || maxConcurrency.HasValue == false)
{
maxConcurrency = cache.CachedGroups.Length;
}
int actualThreads = maxConcurrency ?? recommendedThreads;
// 创建节流器
using var throttler = new SemaphoreSlim(initialCount: actualThreads);
// 使用LongRunning避免线程池饥饿
var tasks = items.Select(async item =>
{
await throttler.WaitAsync();
try
{
var deviceId = deviceIdSelector(item);
if (cache.BalancedMapping.TryGetValue(deviceId, out int groupId))
{
// 分组级处理(保持顺序性)
await ProcessItemAsync(item, processor, groupId);
}
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
/// <summary>
/// 自动计算最优线程数
/// </summary>
public static int CalculateOptimalThreadCount()
{
int coreCount = Environment.ProcessorCount;
return Math.Min(
coreCount * 2, // 超线程优化
_currentCache?.CachedGroups.Length ?? 60
);
}
/// <summary>
/// 分组异步处理(带节流)
/// </summary>
private static async Task ProcessItemAsync<T>(T item, Action<T,int> processor, int groupId)
{
// 使用内存缓存降低CPU负载
await Task.Yield(); // 立即释放当前线程
// 分组处理上下文
var context = ExecutionContext.Capture();
ThreadPool.QueueUserWorkItem(_ =>
{
ExecutionContext.Run(context!, state =>
{
processor(item,groupId);
}, null);
});
}
/// <summary>
/// 通过 deviceId 获取所在的分组集合
/// </summary>
public static List<string> GetGroup(string deviceId)
{
var cache = _currentCache;
if (cache == null)
throw new InvalidOperationException("缓存未初始化");
return cache.CachedGroups[cache.BalancedMapping[deviceId]];
}
/// <summary>
/// 通过 deviceId 获取分组Id
/// </summary>
public static int GetDeviceGroupId(string deviceId)
{
var cache = _currentCache;
if (cache == null)
throw new InvalidOperationException("缓存未初始化");
return cache.BalancedMapping[deviceId];
}
/// <summary>
/// 创建均衡映射表
/// </summary>
/// <param name="deviceList">数据集合</param>
/// <param name="groupCount">分组数量</param>
/// <param name="maxDeviation">允许的最大偏差百分比</param>
/// <returns></returns>
public static Dictionary<string, int> CreateBalancedMapping(List<string> deviceList, int groupCount, int maxDeviation = 5)
{
var mapping = new Dictionary<string, int>();
int targetPerGroup = deviceList.Count / groupCount;
int maxAllowed = (int)(targetPerGroup * (1 + maxDeviation / 100.0));
// 初始化分组计数器
int[] groupCounters = new int[groupCount];
foreach (var deviceId in deviceList)
{
int preferredGroup = GetGroupId(deviceId, groupCount);
// 如果首选分组未满,直接分配
if (groupCounters[preferredGroup] < maxAllowed)
{
mapping[deviceId] = preferredGroup;
groupCounters[preferredGroup]++;
}
else
{
// 寻找当前最空闲的分组
int fallbackGroup = Array.IndexOf(groupCounters, groupCounters.Min());
mapping[deviceId] = fallbackGroup;
groupCounters[fallbackGroup]++;
}
}
return mapping;
}
/// <summary>
/// 分析分组分布
/// </summary>
/// <param name="deviceList"></param>
/// <param name="groupCount"></param>
/// <returns></returns>
public static Dictionary<int, int> AnalyzeDistribution(List<string> deviceList, int groupCount)
{
Dictionary<int, int> distribution = new Dictionary<int, int>();
foreach (var deviceId in deviceList)
{
int groupId = GetGroupId(deviceId, groupCount);
distribution[groupId] = distribution.TryGetValue(groupId, out var count) ? count + 1 : 1;
}
return distribution;
}
/// <summary>
/// 获取设备ID对应的分组ID
/// </summary>
/// <param name="deviceId"></param>
/// <param name="groupCount"></param>
/// <returns></returns>
public static int GetGroupId(string deviceId, int groupCount)
{
int hash = Fnv1aHash(deviceId);
// 双重取模确保分布均匀
return (hash % groupCount + groupCount) % groupCount;
}
/// <summary>
/// FNV-1a哈希算法
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static int Fnv1aHash(string input)
{
const uint fnvPrime = 16777619;
const uint fnvOffsetBasis = 2166136261;
uint hash = fnvOffsetBasis;
foreach (char c in input)
{
hash ^= (byte)c;
hash *= fnvPrime;
}
return (int)hash;
}
/// <summary>
/// CRC16算法实现
/// </summary>
/// <param name="bytes"></param>
/// <returns></returns>
public static ushort CRC16Hash(byte[] bytes)
{
ushort crc = 0xFFFF;
for (int i = 0; i < bytes.Length; i++)
{
crc ^= bytes[i];
for (int j = 0; j < 8; j++)
{
if ((crc & 0x0001) == 1)
{
crc = (ushort)((crc >> 1) ^ 0xA001);
}
else
{
crc >>= 1;
}
}
}
return crc;
}
/// <summary>
/// 打印分组统计数据
/// </summary>
public static void PrintDistributionStats()
{
var cache = _currentCache;
if (cache == null)
{
Console.WriteLine("缓存未初始化");
return;
}
var stats = cache.CachedGroups
.Select((group, idx) => new { GroupId = idx, Count = group.Count })
.OrderBy(x => x.GroupId);
Console.WriteLine("分组数据量统计:");
foreach (var stat in stats)
{
Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据");
}
Console.WriteLine($"总共: {stats.Sum(d=>d.Count)} 条数据");
}
}
}