修改代码

This commit is contained in:
ChenYi 2025-04-09 17:29:30 +08:00
parent 4cd7889dd4
commit d3cd390312
6 changed files with 78 additions and 33 deletions

View File

@ -64,6 +64,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading
/// </summary> /// </summary>
public abstract string SystemType { get; } public abstract string SystemType { get; }
/// <summary>
/// 应用服务器部署标记
/// </summary>
public abstract string ServerTagName { get; }
/// <summary> /// <summary>
///电表日冻结采集项 ///电表日冻结采集项
/// </summary> /// </summary>

View File

@ -15,6 +15,7 @@ using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;
using MassTransit; using MassTransit;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
using Volo.Abp.Uow; using Volo.Abp.Uow;
@ -28,15 +29,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//[Route($"/energy/app/scheduled")] //[Route($"/energy/app/scheduled")]
public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService
{ {
string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger, public EnergySystemScheduledMeterReadingService(ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository) : base(logger, producerBus, meterReadingRecordRepository, dbProvider) ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration) : base(logger, producerBus, meterReadingRecordRepository, dbProvider)
{ {
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
} }
public sealed override string SystemType => SystemTypeConst.Energy; public sealed override string SystemType => SystemTypeConst.Energy;
public sealed override string ServerTagName => serverTagName;
/// <summary> /// <summary>
/// 获取采集项列表 /// 获取采集项列表
/// </summary> /// </summary>

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.Common.Consts
{
/// <summary>
/// 常用常量管理
/// </summary>
public class CommonConst
{
/// <summary>
/// 服务器标识
/// </summary>
public const string ServerTagName = "ServerTagName";
/// <summary>
/// Kafka
/// </summary>
public const string Kafka = "Kafka";
}
}

View File

@ -29,19 +29,19 @@ namespace JiShe.CollectBus.Common.Consts
public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen"; public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen";
/// <summary> /// <summary>
/// 缓存表计信息,{0}=>系统类型,{1}=>表计类别 /// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别{3}=>采集频率
/// </summary> /// </summary>
public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:MeterInfo:{"{1}"}:{"{2}"}:"; public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}:";
/// <summary> /// <summary>
/// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>表计类别,{2}=>采集频率 /// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
/// </summary> /// </summary>
public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}"}:TaskInfo:{"{1}"}:{"{2}"}"; public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TaskInfo:{"{2}"}:{"{3}"}";
/// <summary> /// <summary>
/// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>表计类别,{2}=>采集频率 /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率
/// </summary> /// </summary>
public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:TelemetryPacket:{"{1}"}:{"{2}"}:"; public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:";
public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey"; public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey";
} }

View File

@ -762,37 +762,49 @@ namespace JiShe.CollectBus.Common.Helpers
} }
/// <summary> /// <summary>
/// 获取设备Id哈希值 /// 固定Kafka主题分组数为50避免动态计算
/// </summary> /// </summary>
/// <param name="deviceId"></param> private const int FixedGroupCount = 50;
/// <param name="TotalShards"></param>
/// <returns></returns> /// <summary>
public static int GetDeviceHashCode(string deviceId, int TotalShards = 100) /// 根据 deviceId 获取其所属分组ID0~49
/// </summary>
public static int GetGroupId(string deviceId)
{ {
// 计算哈希分组ID int hashCode = deviceId.GetHashCode();
return Math.Abs(deviceId.GetHashCode() % TotalShards); // 更安全的非负取模方式,兼容负数哈希码
return (hashCode % FixedGroupCount + FixedGroupCount) % FixedGroupCount;
} }
/// <summary> /// <summary>
/// 获取设备Id哈希分组 /// 分组优化:使用数组替代字典,预初始化分组容器
/// </summary> /// </summary>
/// <param name="deviceList"></param> public static List<string>[] GroupDevices(List<string> deviceList)
/// <returns></returns>
public static Dictionary<string, List<string>> GetDeviceHashGroup(List<string> deviceList)
{ {
Dictionary<string, List<string>> keyValuePairs = new Dictionary<string, List<string>>(); //直接初始化分组,避免动态扩容
List<string>[] groups = new List<string>[FixedGroupCount];
for (int i = 0; i < FixedGroupCount; i++)
{
groups[i] = new List<string>(capacity: deviceList.Count / FixedGroupCount + 1);
}
// 单次遍历直接分配
foreach (var deviceId in deviceList) foreach (var deviceId in deviceList)
{ {
var hashCode = GetDeviceHashCode(deviceId); int groupId = GetGroupId(deviceId);
groups[groupId].Add(deviceId);
if (!keyValuePairs.ContainsKey(hashCode.ToString()))
{
keyValuePairs.Add(hashCode.ToString(), new List<string>());
}
keyValuePairs[hashCode.ToString()].Add(deviceId);
} }
return keyValuePairs;
return groups;
}
/// <summary>
/// 通过 deviceId 直接定位分组
/// </summary>
public static List<string> FindDeviceGroup(List<string>[] groups, string deviceId)
{
int groupId = GetGroupId(deviceId);
return groups[groupId];
} }
} }
} }

View File

@ -24,6 +24,7 @@ using JiShe.CollectBus.IotSystems.MessageIssueds;
using Confluent.Kafka; using Confluent.Kafka;
using MassTransit.SqlTransport.Topology; using MassTransit.SqlTransport.Topology;
using Confluent.Kafka.Admin; using Confluent.Kafka.Admin;
using JiShe.CollectBus.Common.Consts;
namespace JiShe.CollectBus.Host namespace JiShe.CollectBus.Host
@ -370,12 +371,12 @@ namespace JiShe.CollectBus.Host
{ {
var adminClient = new AdminClientBuilder(new AdminClientConfig var adminClient = new AdminClientBuilder(new AdminClientConfig
{ {
BootstrapServers = configuration.GetConnectionString("Kafka") BootstrapServers = configuration.GetConnectionString(CommonConst.Kafka)
}).Build(); }).Build();
try try
{ {
string serverTagName = configuration.GetSection("ServerTagName").Value!; string serverTagName = configuration.GetSection(CommonConst.ServerTagName).Value!;
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName); List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName);
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName)); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName));