实现设备组负载控制,用于Kafka主题分区设备数量的均衡
This commit is contained in:
parent
d3cd390312
commit
7e0aa8169b
@ -13,6 +13,7 @@ using Microsoft.AspNetCore.Mvc;
|
|||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using JiShe.CollectBus.IoTDBProvider.Context;
|
using JiShe.CollectBus.IoTDBProvider.Context;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Samples;
|
namespace JiShe.CollectBus.Samples;
|
||||||
|
|
||||||
@ -83,6 +84,29 @@ public class SampleAppService : CollectBusAppService, ISampleAppService
|
|||||||
await _iotDBProvider.InsertAsync(meter);
|
await _iotDBProvider.InsertAsync(meter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 测试设备分组均衡控制算法
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deviceCount"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
[HttpGet]
|
||||||
|
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
|
||||||
|
{
|
||||||
|
var deviceList = new List<string>();
|
||||||
|
for (int i = 0; i < deviceCount; i++)
|
||||||
|
{
|
||||||
|
deviceList.Add($"Device_{Guid.NewGuid()}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 初始化缓存
|
||||||
|
DeviceGroupBalanceControl.InitializeCache(deviceList);
|
||||||
|
|
||||||
|
// 打印分布统计
|
||||||
|
DeviceGroupBalanceControl.PrintDistributionStats();
|
||||||
|
|
||||||
|
await Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public Task<SampleDto> GetAsync()
|
public Task<SampleDto> GetAsync()
|
||||||
{
|
{
|
||||||
|
|||||||
@ -1,39 +1,23 @@
|
|||||||
using System;
|
using DotNetCore.CAP;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Diagnostics;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Net;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using DotNetCore.CAP;
|
|
||||||
using DotNetCore.CAP.Messages;
|
|
||||||
using FreeSql;
|
|
||||||
using FreeSql.Internal.CommonProvider;
|
|
||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.Common;
|
|
||||||
using JiShe.CollectBus.Common.BuildSendDatas;
|
using JiShe.CollectBus.Common.BuildSendDatas;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
using JiShe.CollectBus.Common.Enums;
|
using JiShe.CollectBus.Common.Enums;
|
||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Common.Models;
|
|
||||||
using JiShe.CollectBus.Enums;
|
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
using JiShe.CollectBus.IoTDBProvider;
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
|
||||||
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
using JiShe.CollectBus.IotSystems.MessageIssueds;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
using JiShe.CollectBus.IotSystems.Watermeter;
|
using JiShe.CollectBus.IotSystems.Watermeter;
|
||||||
using JiShe.CollectBus.Protocol.Contracts;
|
using JiShe.CollectBus.Protocol.Contracts;
|
||||||
using JiShe.CollectBus.Repository;
|
|
||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
using JiShe.CollectBus.Workers;
|
|
||||||
using MassTransit;
|
|
||||||
using MassTransit.Internals.GraphValidation;
|
|
||||||
using MassTransit.Transports;
|
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Volo.Abp.Domain.Repositories;
|
using System;
|
||||||
using static FreeSql.Internal.GlobalFilter;
|
using System.Collections.Generic;
|
||||||
|
using System.Diagnostics;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.ScheduledMeterReading
|
namespace JiShe.CollectBus.ScheduledMeterReading
|
||||||
{
|
{
|
||||||
@ -94,7 +78,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public virtual async Task CreateToBeIssueTasks()
|
public virtual async Task CreateToBeIssueTasks()
|
||||||
{
|
{
|
||||||
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:TaskInfo:*";
|
var redisCacheKey = $"{RedisConst.CacheBasicDirectoryKey}{SystemType}:{ServerTagName}:TaskInfo:*";
|
||||||
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
|
var taskInfos = await FreeRedisProvider.Instance.KeysAsync(redisCacheKey);
|
||||||
if (taskInfos == null || taskInfos.Length <= 0)
|
if (taskInfos == null || taskInfos.Length <= 0)
|
||||||
{
|
{
|
||||||
@ -185,6 +169,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
|
throw new NullReferenceException($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据时,采集项类型数据为空");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<string> focusAddressDataList = new List<string>();//用于处理Kafka主题分区数据的分发和处理。
|
||||||
|
|
||||||
//根据采集频率分组,获得采集频率分组
|
//根据采集频率分组,获得采集频率分组
|
||||||
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
|
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
|
||||||
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
|
foreach (var itemTimeDensity in meterInfoGroupByTimeDensity)
|
||||||
@ -198,7 +184,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
|
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
|
||||||
|
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
//每次缓存时,删除缓存,避免缓存数据有不准确的问题
|
||||||
@ -258,14 +244,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
|
await FreeRedisProvider.Instance.HSetAsync(redisCacheKey, keyValuePairs);
|
||||||
}
|
}
|
||||||
|
|
||||||
//在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行
|
//在缓存表信息数据的时候,新增下一个时间的自动处理任务,1分钟后执行所有的采集频率任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
{
|
{
|
||||||
TimeDensity = itemTimeDensity.Key,
|
TimeDensity = itemTimeDensity.Key,
|
||||||
NextTask = DateTime.Now.AddMinutes(1)
|
NextTask = DateTime.Now.AddMinutes(1)
|
||||||
};
|
};
|
||||||
|
|
||||||
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, itemTimeDensity.Key);
|
var taskRedisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key);
|
||||||
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
|
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -279,10 +265,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
|
public virtual async Task AmmeterScheduledMeterOneMinuteReading()
|
||||||
{
|
{
|
||||||
//获取缓存中的电表信息
|
//获取缓存中的电表信息
|
||||||
int timeDensity = 5;
|
int timeDensity = 1;
|
||||||
var currentTime = DateTime.Now;
|
var currentTime = DateTime.Now;
|
||||||
|
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
|
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
|
||||||
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
@ -324,21 +310,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
//_dbProvider.SwitchSessionPool(true);
|
//_dbProvider.SwitchSessionPool(true);
|
||||||
//await _dbProvider.InsertAsync(meterTaskInfosList);
|
//await _dbProvider.InsertAsync(meterTaskInfosList);
|
||||||
|
|
||||||
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentTime);
|
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除任务数据
|
//删除任务数据
|
||||||
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
|
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
|
||||||
|
await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
|
||||||
{
|
|
||||||
TimeDensity = timeDensity,
|
|
||||||
NextTask = DateTime.Now.AddMinutes(timeDensity)
|
|
||||||
};
|
|
||||||
|
|
||||||
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity);
|
|
||||||
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
|
_logger.LogInformation($"{nameof(AmmeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
|
||||||
|
|
||||||
@ -354,7 +332,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
int timeDensity = 5;
|
int timeDensity = 5;
|
||||||
var currentTime = DateTime.Now;
|
var currentTime = DateTime.Now;
|
||||||
|
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
|
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
|
||||||
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
|
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
@ -384,7 +362,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
FocusAddress = ammerterItem.Value.FocusAddress,
|
FocusAddress = ammerterItem.Value.FocusAddress,
|
||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
_= _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
@ -393,21 +371,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
{
|
{
|
||||||
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentTime);
|
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除任务数据
|
//删除任务数据
|
||||||
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
|
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
|
||||||
{
|
|
||||||
TimeDensity = timeDensity,
|
|
||||||
NextTask = DateTime.Now.AddMinutes(timeDensity)
|
|
||||||
};
|
|
||||||
|
|
||||||
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity);
|
|
||||||
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
|
_logger.LogInformation($"{nameof(AmmeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集电表数据处理完成");
|
||||||
}
|
}
|
||||||
@ -425,7 +396,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
int timeDensity = 15;
|
int timeDensity = 15;
|
||||||
var currentDateTime = DateTime.Now;
|
var currentDateTime = DateTime.Now;
|
||||||
|
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}*";
|
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
|
||||||
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
@ -456,7 +427,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
TimeDensity = timeDensity.ToString(),
|
TimeDensity = timeDensity.ToString(),
|
||||||
};
|
};
|
||||||
|
|
||||||
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500) ,ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
_ = _producerBus.PublishDelayAsync(TimeSpan.FromMicroseconds(500), ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, tempMsg);
|
||||||
|
|
||||||
//_ = _producerBus.Publish(tempMsg);
|
//_ = _producerBus.Publish(tempMsg);
|
||||||
|
|
||||||
@ -465,21 +436,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
}
|
}
|
||||||
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
if (meterTaskInfosList != null && meterTaskInfosList.Count > 0)
|
||||||
{
|
{
|
||||||
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList,currentDateTime);
|
await _meterReadingRecordRepository.InsertManyAsync(meterTaskInfosList, currentDateTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
//删除任务数据
|
//删除任务数据
|
||||||
//await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
|
//await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
await CacheNextTaskData(timeDensity, MeterTypeEnum.Ammeter);
|
||||||
{
|
|
||||||
TimeDensity = timeDensity,
|
|
||||||
NextTask = DateTime.Now.AddMinutes(timeDensity)
|
|
||||||
};
|
|
||||||
|
|
||||||
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.Ammeter, timeDensity);
|
|
||||||
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
|
|
||||||
|
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
|
|
||||||
@ -504,7 +468,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
timeDensity = 15;
|
timeDensity = 15;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( focusGroup == null || focusGroup.Count <= 0)
|
if (focusGroup == null || focusGroup.Count <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
|
_logger.LogError($"{nameof(AmmerterScheduledMeterReadingIssued)} 电表数据采集指令生成失败,参数异常,-101");
|
||||||
return;
|
return;
|
||||||
@ -572,7 +536,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
foreach (var focusInfo in focusGroup)
|
foreach (var focusInfo in focusGroup)
|
||||||
{
|
{
|
||||||
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
//构建缓存任务key,依然 表计类型+采集频率+集中器地址,存hash类型
|
||||||
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
|
var redisCacheKey = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, timeDensity)}{focusInfo.Key}";
|
||||||
|
|
||||||
foreach (var ammeterInfo in focusInfo.Value)
|
foreach (var ammeterInfo in focusInfo.Value)
|
||||||
{
|
{
|
||||||
@ -817,8 +781,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
public virtual async Task WatermeterScheduledMeterOneMinuteReading()
|
public virtual async Task WatermeterScheduledMeterOneMinuteReading()
|
||||||
{
|
{
|
||||||
//获取缓存中的水表信息
|
//获取缓存中的水表信息
|
||||||
int timeDensity = 5;
|
int timeDensity = 1;
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
|
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
|
||||||
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var oneMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
if (oneMinutekeyList == null || oneMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
@ -865,14 +829,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
|
await FreeRedisProvider.Instance.DelAsync(oneMinutekeyList);
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
|
||||||
{
|
|
||||||
TimeDensity = timeDensity,
|
|
||||||
NextTask = DateTime.Now.AddMinutes(timeDensity)
|
|
||||||
};
|
|
||||||
|
|
||||||
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity);
|
|
||||||
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
|
|
||||||
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
|
_logger.LogInformation($"{nameof(WatermeterScheduledMeterOneMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
|
||||||
@ -887,7 +844,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
|
|
||||||
//获取缓存中的电表信息
|
//获取缓存中的电表信息
|
||||||
int timeDensity = 5;
|
int timeDensity = 5;
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
|
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
|
||||||
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
|
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
@ -934,14 +891,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
|
await FreeRedisProvider.Instance.DelAsync(fiveMinutekeyList);
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
|
||||||
{
|
|
||||||
TimeDensity = timeDensity,
|
|
||||||
NextTask = DateTime.Now.AddMinutes(timeDensity)
|
|
||||||
};
|
|
||||||
|
|
||||||
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity);
|
|
||||||
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
|
|
||||||
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
|
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
|
||||||
@ -955,7 +905,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
{
|
{
|
||||||
//获取缓存中的电表信息
|
//获取缓存中的电表信息
|
||||||
int timeDensity = 15;
|
int timeDensity = 15;
|
||||||
var redisKeyList = $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity)}*";
|
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.WaterMeter);
|
||||||
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
var fifteenMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
|
||||||
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
if (fifteenMinutekeyList == null || fifteenMinutekeyList.Length <= 0)
|
||||||
{
|
{
|
||||||
@ -1002,14 +952,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
|
await FreeRedisProvider.Instance.DelAsync(fifteenMinutekeyList);
|
||||||
|
|
||||||
//缓存下一个时间的任务
|
//缓存下一个时间的任务
|
||||||
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
await CacheNextTaskData(timeDensity, MeterTypeEnum.WaterMeter);
|
||||||
{
|
|
||||||
TimeDensity = timeDensity,
|
|
||||||
NextTask = DateTime.Now.AddMinutes(timeDensity)
|
|
||||||
};
|
|
||||||
|
|
||||||
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, MeterTypeEnum.WaterMeter, timeDensity);
|
|
||||||
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
|
|
||||||
|
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
|
_logger.LogInformation($"{nameof(WatermeterScheduledMeterFiveMinuteReading)} {timeDensity}分钟采集水表数据处理完成");
|
||||||
@ -1094,6 +1037,36 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 缓存下一个时间的任务
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="timeDensity">采集频率</param>
|
||||||
|
/// <param name="meterType">表类型</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
private async Task CacheNextTaskData(int timeDensity, MeterTypeEnum meterType)
|
||||||
|
{
|
||||||
|
//缓存下一个时间的任务
|
||||||
|
TasksToBeIssueModel nextTask = new TasksToBeIssueModel()
|
||||||
|
{
|
||||||
|
TimeDensity = timeDensity,
|
||||||
|
NextTask = DateTime.Now.AddMinutes(timeDensity)
|
||||||
|
};
|
||||||
|
|
||||||
|
var redisCacheKey = string.Format(RedisConst.CacheTasksToBeIssuedKey, SystemType, ServerTagName, meterType, timeDensity);
|
||||||
|
await FreeRedisProvider.Instance.SetAsync(redisCacheKey, nextTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取缓存表计下发指令缓存key前缀
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="timeDensity"></param>
|
||||||
|
/// <param name="meterType"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
private string GetTelemetryPacketCacheKeyPrefix(int timeDensity, MeterTypeEnum meterType)
|
||||||
|
{
|
||||||
|
return $"{string.Format(RedisConst.CacheTelemetryPacketInfoKey, SystemType, ServerTagName, meterType, timeDensity)}*";
|
||||||
|
}
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -760,51 +760,5 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
|
|
||||||
return fontValue;
|
return fontValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 固定Kafka主题分组数为50,避免动态计算
|
|
||||||
/// </summary>
|
|
||||||
private const int FixedGroupCount = 50;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 根据 deviceId 获取其所属分组ID(0~49)
|
|
||||||
/// </summary>
|
|
||||||
public static int GetGroupId(string deviceId)
|
|
||||||
{
|
|
||||||
int hashCode = deviceId.GetHashCode();
|
|
||||||
// 更安全的非负取模方式,兼容负数哈希码
|
|
||||||
return (hashCode % FixedGroupCount + FixedGroupCount) % FixedGroupCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 分组优化:使用数组替代字典,预初始化分组容器
|
|
||||||
/// </summary>
|
|
||||||
public static List<string>[] GroupDevices(List<string> deviceList)
|
|
||||||
{
|
|
||||||
//直接初始化分组,避免动态扩容
|
|
||||||
List<string>[] groups = new List<string>[FixedGroupCount];
|
|
||||||
for (int i = 0; i < FixedGroupCount; i++)
|
|
||||||
{
|
|
||||||
groups[i] = new List<string>(capacity: deviceList.Count / FixedGroupCount + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 单次遍历直接分配
|
|
||||||
foreach (var deviceId in deviceList)
|
|
||||||
{
|
|
||||||
int groupId = GetGroupId(deviceId);
|
|
||||||
groups[groupId].Add(deviceId);
|
|
||||||
}
|
|
||||||
|
|
||||||
return groups;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 通过 deviceId 直接定位分组
|
|
||||||
/// </summary>
|
|
||||||
public static List<string> FindDeviceGroup(List<string>[] groups, string deviceId)
|
|
||||||
{
|
|
||||||
int groupId = GetGroupId(deviceId);
|
|
||||||
return groups[groupId];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
163
src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs
Normal file
163
src/JiShe.CollectBus.Common/Helpers/DeviceGroupBalanceControl.cs
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.Common.Helpers
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 设备组负载控制
|
||||||
|
/// </summary>
|
||||||
|
public class DeviceGroupBalanceControl
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 设备组数量
|
||||||
|
/// </summary>
|
||||||
|
private const int GroupCount = 50;
|
||||||
|
private static List<string>[] _cachedGroups;
|
||||||
|
private static Dictionary<string, int> _balancedMapping;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 初始化缓存并强制均衡
|
||||||
|
/// </summary>
|
||||||
|
public static void InitializeCache(List<string> deviceList)
|
||||||
|
{
|
||||||
|
// 步骤1: 生成均衡映射表
|
||||||
|
_balancedMapping = CreateBalancedMapping(deviceList, GroupCount);
|
||||||
|
|
||||||
|
// 步骤2: 根据映射表填充分组
|
||||||
|
_cachedGroups = new List<string>[GroupCount];
|
||||||
|
for (int i = 0; i < GroupCount; i++)
|
||||||
|
{
|
||||||
|
_cachedGroups[i] = new List<string>(capacity: deviceList.Count / GroupCount + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var deviceId in deviceList)
|
||||||
|
{
|
||||||
|
int groupId = _balancedMapping[deviceId];
|
||||||
|
_cachedGroups[groupId].Add(deviceId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 通过 deviceId 获取分组
|
||||||
|
/// </summary>
|
||||||
|
public static List<string> GetGroup(string deviceId)
|
||||||
|
{
|
||||||
|
if (_balancedMapping == null || _cachedGroups == null)
|
||||||
|
throw new InvalidOperationException("缓存未初始化");
|
||||||
|
|
||||||
|
int groupId = _balancedMapping[deviceId];
|
||||||
|
return _cachedGroups[groupId];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 创建均衡映射表
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deviceList">数据集合</param>
|
||||||
|
/// <param name="groupCount">分组数量</param>
|
||||||
|
/// <param name="maxDeviation">允许的最大偏差百分比</param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static Dictionary<string, int> CreateBalancedMapping(List<string> deviceList, int groupCount, int maxDeviation = 5)
|
||||||
|
{
|
||||||
|
var mapping = new Dictionary<string, int>();
|
||||||
|
int targetPerGroup = deviceList.Count / groupCount;
|
||||||
|
int maxAllowed = (int)(targetPerGroup * (1 + maxDeviation / 100.0));
|
||||||
|
|
||||||
|
// 初始化分组计数器
|
||||||
|
int[] groupCounters = new int[groupCount];
|
||||||
|
|
||||||
|
// 随机数生成器用于平衡分配
|
||||||
|
Random rand = new Random();
|
||||||
|
|
||||||
|
foreach (var deviceId in deviceList)
|
||||||
|
{
|
||||||
|
int preferredGroup = GetGroupId(deviceId, groupCount);
|
||||||
|
|
||||||
|
// 如果首选分组未满,直接分配
|
||||||
|
if (groupCounters[preferredGroup] < maxAllowed)
|
||||||
|
{
|
||||||
|
mapping[deviceId] = preferredGroup;
|
||||||
|
groupCounters[preferredGroup]++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// 寻找当前最空闲的分组
|
||||||
|
int fallbackGroup = Array.IndexOf(groupCounters, groupCounters.Min());
|
||||||
|
mapping[deviceId] = fallbackGroup;
|
||||||
|
groupCounters[fallbackGroup]++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return mapping;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 分析分组分布
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deviceList"></param>
|
||||||
|
/// <param name="groupCount"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static Dictionary<int, int> AnalyzeDistribution(List<string> deviceList, int groupCount)
|
||||||
|
{
|
||||||
|
Dictionary<int, int> distribution = new Dictionary<int, int>();
|
||||||
|
foreach (var deviceId in deviceList)
|
||||||
|
{
|
||||||
|
int groupId = GetGroupId(deviceId, groupCount);
|
||||||
|
distribution[groupId] = distribution.TryGetValue(groupId, out var count) ? count + 1 : 1;
|
||||||
|
}
|
||||||
|
return distribution;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 获取设备ID对应的分组ID
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deviceId"></param>
|
||||||
|
/// <param name="groupCount"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static int GetGroupId(string deviceId, int groupCount)
|
||||||
|
{
|
||||||
|
int hash = Fnv1aHash(deviceId);
|
||||||
|
// 双重取模确保分布均匀
|
||||||
|
return (hash % groupCount + groupCount) % groupCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// FNV-1a哈希算法
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="input"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
public static int Fnv1aHash(string input)
|
||||||
|
{
|
||||||
|
const uint fnvPrime = 16777619;
|
||||||
|
const uint fnvOffsetBasis = 2166136261;
|
||||||
|
|
||||||
|
uint hash = fnvOffsetBasis;
|
||||||
|
foreach (char c in input)
|
||||||
|
{
|
||||||
|
hash ^= (byte)c;
|
||||||
|
hash *= fnvPrime;
|
||||||
|
}
|
||||||
|
return (int)hash;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 打印分组统计数据
|
||||||
|
/// </summary>
|
||||||
|
public static void PrintDistributionStats()
|
||||||
|
{
|
||||||
|
var stats = _cachedGroups
|
||||||
|
.Select((group, idx) => new { GroupId = idx, Count = group.Count })
|
||||||
|
.OrderBy(x => x.GroupId);
|
||||||
|
|
||||||
|
Console.WriteLine("分组数据量统计:");
|
||||||
|
foreach (var stat in stats)
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Group {stat.GroupId}: {stat.Count} 条数据");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user