dev #2

Merged
admin merged 176 commits from dev into master 2025-04-18 01:31:49 +00:00
11 changed files with 103 additions and 32 deletions
Showing only changes of commit 8257d6b129 - Show all commits

View File

@ -184,6 +184,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue;
}
focusAddressDataList.Add(item.Key);
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG
@ -255,6 +257,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
}
//初始化设备组负载控制
if (focusAddressDataList == null || focusAddressDataList.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList);
}
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成");
}
@ -332,7 +345,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int timeDensity = 5;
var currentTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{

View File

@ -4,6 +4,7 @@ using System.Threading.Tasks;
using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDBProvider;
@ -15,6 +16,7 @@ using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using MassTransit;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories;
@ -169,5 +171,27 @@ namespace JiShe.CollectBus.ScheduledMeterReading
.Ado
.QueryAsync<WatermeterInfo>(sql);
}
/// <summary>
/// 测试设备分组均衡控制算法
/// </summary>
/// <param name="deviceCount"></param>
/// <returns></returns>
[HttpGet]
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
{
var deviceList = new List<string>();
for (int i = 0; i < deviceCount; i++)
{
deviceList.Add($"Device_{Guid.NewGuid()}");
}
// 打印分布统计
DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
}
}

View File

@ -43,6 +43,11 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary>
public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:";
/// <summary>
/// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
/// </summary>
public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap";
public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey";
}
}

View File

@ -1,8 +1,10 @@
using System;
using JiShe.CollectBus.FreeRedisProvider;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Common.Helpers
{
@ -10,38 +12,42 @@ namespace JiShe.CollectBus.Common.Helpers
/// 设备组负载控制
/// </summary>
public class DeviceGroupBalanceControl
{
{
/// <summary>
/// 设备组数量
/// 分组集合
/// </summary>
private const int GroupCount = 50;
private static List<string>[] _cachedGroups;
/// <summary>
/// 设备分组关系映射
/// </summary>
private static Dictionary<string, int> _balancedMapping;
/// <summary>
/// 初始化缓存并强制均衡
/// </summary>
public static void InitializeCache(List<string> deviceList)
public static void InitializeCache(List<string> deviceList,int groupCount = 50)
{
// 步骤1: 生成均衡映射表
_balancedMapping = CreateBalancedMapping(deviceList, GroupCount);
_balancedMapping = CreateBalancedMapping(deviceList, groupCount);
// 步骤2: 根据映射表填充分组
_cachedGroups = new List<string>[GroupCount];
for (int i = 0; i < GroupCount; i++)
_cachedGroups = new List<string>[groupCount];
for (int i = 0; i < groupCount; i++)
{
_cachedGroups[i] = new List<string>(capacity: deviceList.Count / GroupCount + 1);
_cachedGroups[i] = new List<string>(capacity: deviceList.Count / groupCount + 1);
}
foreach (var deviceId in deviceList)
{
int groupId = _balancedMapping[deviceId];
_cachedGroups[groupId].Add(deviceId);
}
}
}
/// <summary>
/// 通过 deviceId 获取分组
/// 通过 deviceId 获取所在的分组集合
/// </summary>
public static List<string> GetGroup(string deviceId)
{
@ -52,6 +58,17 @@ namespace JiShe.CollectBus.Common.Helpers
return _cachedGroups[groupId];
}
/// <summary>
/// 通过 deviceId 获取分组Id
/// </summary>
public static int GetDeviceGroupId(string deviceId)
{
if (_balancedMapping == null || _cachedGroups == null)
throw new InvalidOperationException("缓存未初始化");
return _balancedMapping[deviceId];
}
/// <summary>
/// 创建均衡映射表
@ -69,9 +86,6 @@ namespace JiShe.CollectBus.Common.Helpers
// 初始化分组计数器
int[] groupCounters = new int[groupCount];
// 随机数生成器用于平衡分配
Random rand = new Random();
foreach (var deviceId in deviceList)
{
int preferredGroup = GetGroupId(deviceId, groupCount);

View File

@ -30,6 +30,8 @@ namespace JiShe.CollectBus.Common.Helpers
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false,
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
IgnoreReadOnlyFields = true,
IgnoreReadOnlyProperties = true,
};
}
@ -67,6 +69,8 @@ namespace JiShe.CollectBus.Common.Helpers
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false,
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
IgnoreReadOnlyFields = true,
IgnoreReadOnlyProperties = true,
};
}

View File

@ -25,6 +25,11 @@
<PackageReference Include="Serilog" Version="4.1.0" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
</ItemGroup>
</Project>

View File

@ -375,11 +375,10 @@ namespace JiShe.CollectBus.Host
}).Build();
try
{
string serverTagName = configuration.GetSection(CommonConst.ServerTagName).Value!;
{
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName);
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName));
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
foreach (var item in topics)

View File

@ -45,7 +45,7 @@ namespace JiShe.CollectBus.Host
ConfigureHangfire(context);
ConfigureCap(context, configuration);
//ConfigureMassTransit(context, configuration);
ConfigureKafkaTopic(context, configuration);
//ConfigureKafkaTopic(context, configuration);
ConfigureAuditLog(context);
ConfigureCustom(context, configuration);
}

View File

@ -35,12 +35,12 @@
},
"ConnectionStrings": {
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
"Kafka": "192.168.0.151:29092,192.168.0.151:39092,192.168.0.151:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
},
"Redis": {
"Configuration": "120.24.52.151:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"Configuration": "192.168.0.151:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"DefaultDB": "14",
"HangfireDB": "15"
},
@ -86,16 +86,17 @@
"SecurityProtocol": "SASL_PLAINTEXT",
"SaslMechanism": "PLAIN",
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980"
"SaslPassword": "lixiao1980",
"ServerTagName": "JiSheCollectBus",
"NumPartitions": 50
},
"IoTDBOptions": {
"UserName": "root",
"Password": "root",
"ClusterList": [ "192.168.56.102:6667" ],
"ClusterList": [ "192.168.0.151:6667" ],
"PoolSize": 2,
"DataBaseName": "energy",
"OpenDebugMode": true,
"UseTableSessionPoolByDefault": false
},
"ServerTagName": "JiSheCollectBus"
}
}

View File

@ -15,7 +15,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
/// </summary>
public static List<string> GetAllTopicNamesByIssued(string serverTagName)
public static List<string> GetAllTopicNamesByIssued()
{
List<string> topics = typeof(ProtocolConst)
.GetFields(BindingFlags.Public | BindingFlags.Static)
@ -24,7 +24,6 @@ namespace JiShe.CollectBus.Protocol.Contracts
!f.IsInitOnly &&
f.FieldType == typeof(string) &&
f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段
//.Select(f => $"{serverTagName}.{(string)f.GetRawConstantValue()!}")
.Select(f => (string)f.GetRawConstantValue()!)
.ToList();
@ -35,7 +34,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
/// </summary>
public static List<string> GetAllTopicNamesByReceived(string serverTagName)
public static List<string> GetAllTopicNamesByReceived()
{
//固定的上报主题
var topicList = typeof(ProtocolConst)
@ -50,13 +49,20 @@ namespace JiShe.CollectBus.Protocol.Contracts
//动态上报主题需根据协议的AFN功能码动态获取
var afnList = EnumExtensions.ToNameValueDictionary<AFN>();
//需要排除的AFN功能码
var excludeItems = new List<int>() { 6, 7, 8,15 };
foreach (var item in afnList)
{
if (excludeItems.Contains(item.Value))
{
continue;
}
topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0')));
}
//return topicList.Select(f => $"{serverTagName}.{f}").ToList();
return topicList;
}
}

View File

@ -19,7 +19,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
public const string SubscriberLoginIssuedEventName = "issued.login.event";
/// <summary>
/// 上行消息主题
/// 上行消息主题,测试使用
/// </summary>
public const string SubscriberReceivedEventName = "received.event";