diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 1c56834..ccf7aa5 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -64,6 +64,11 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public abstract string SystemType { get; } + /// + /// 应用服务器部署标记 + /// + public abstract string ServerTagName { get; } + /// ///电表日冻结采集项 /// diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index b1c8b18..e213da3 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -15,6 +15,7 @@ using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository.MeterReadingRecord; using MassTransit; using Microsoft.AspNetCore.Authorization; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Volo.Abp.Domain.Repositories; using Volo.Abp.Uow; @@ -28,15 +29,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading //[Route($"/energy/app/scheduled")] public class EnergySystemScheduledMeterReadingService : BasicScheduledMeterReadingService { - + string serverTagName = string.Empty; public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository) : base(logger, producerBus, meterReadingRecordRepository, dbProvider) + ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration) : base(logger, producerBus, meterReadingRecordRepository, dbProvider) { - + serverTagName = configuration.GetValue(CommonConst.ServerTagName)!; } public sealed override string SystemType => SystemTypeConst.Energy; + public sealed override string ServerTagName => serverTagName; + /// /// 获取采集项列表 /// diff --git a/src/JiShe.CollectBus.Common/Consts/CommonConst.cs b/src/JiShe.CollectBus.Common/Consts/CommonConst.cs new file mode 100644 index 0000000..ff38f22 --- /dev/null +++ b/src/JiShe.CollectBus.Common/Consts/CommonConst.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Common.Consts +{ + /// + /// 常用常量管理 + /// + public class CommonConst + { + /// + /// 服务器标识 + /// + public const string ServerTagName = "ServerTagName"; + + /// + /// Kafka + /// + public const string Kafka = "Kafka"; + } +} diff --git a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs index 9b921c9..526c706 100644 --- a/src/JiShe.CollectBus.Common/Consts/RedisConst.cs +++ b/src/JiShe.CollectBus.Common/Consts/RedisConst.cs @@ -29,19 +29,19 @@ namespace JiShe.CollectBus.Common.Consts public const string FifteenMinuteAcquisitionTimeInterval = "Fifteen"; /// - /// 缓存表计信息,{0}=>系统类型,{1}=>表计类别 + /// 缓存表计信息,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// - public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:MeterInfo:{"{1}"}:{"{2}"}:"; + public const string CacheMeterInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:MeterInfo:{"{2}"}:{"{3}"}:"; /// - /// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>表计类别,{2}=>采集频率 + /// 缓存待下发的指令生产任务数据,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// - public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}"}:TaskInfo:{"{1}"}:{"{2}"}"; + public const string CacheTasksToBeIssuedKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TaskInfo:{"{2}"}:{"{3}"}"; /// - /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>表计类别,{2}=>采集频率 + /// 缓存表计下发指令数据集,{0}=>系统类型,{1}=>应用服务部署标记,{2}=>表计类别,{3}=>采集频率 /// - public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}"}:TelemetryPacket:{"{1}"}:{"{2}"}:"; + public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:"; public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey"; } diff --git a/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs index 8c980c5..5cf77b1 100644 --- a/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs +++ b/src/JiShe.CollectBus.Common/Helpers/CommonHelper.cs @@ -760,39 +760,51 @@ namespace JiShe.CollectBus.Common.Helpers return fontValue; } - + /// - /// 获取设备Id哈希值 + /// 固定Kafka主题分组数为50,避免动态计算 /// - /// - /// - /// - public static int GetDeviceHashCode(string deviceId, int TotalShards = 100) + private const int FixedGroupCount = 50; + + /// + /// 根据 deviceId 获取其所属分组ID(0~49) + /// + public static int GetGroupId(string deviceId) { - // 计算哈希分组ID - return Math.Abs(deviceId.GetHashCode() % TotalShards); + int hashCode = deviceId.GetHashCode(); + // 更安全的非负取模方式,兼容负数哈希码 + return (hashCode % FixedGroupCount + FixedGroupCount) % FixedGroupCount; } /// - /// 获取设备Id哈希分组 + /// 分组优化:使用数组替代字典,预初始化分组容器 /// - /// - /// - public static Dictionary> GetDeviceHashGroup(List deviceList) + public static List[] GroupDevices(List deviceList) { - Dictionary> keyValuePairs = new Dictionary>(); + //直接初始化分组,避免动态扩容 + List[] groups = new List[FixedGroupCount]; + for (int i = 0; i < FixedGroupCount; i++) + { + groups[i] = new List(capacity: deviceList.Count / FixedGroupCount + 1); + } + + // 单次遍历直接分配 foreach (var deviceId in deviceList) { - var hashCode = GetDeviceHashCode(deviceId); - - if (!keyValuePairs.ContainsKey(hashCode.ToString())) - { - keyValuePairs.Add(hashCode.ToString(), new List()); - } - - keyValuePairs[hashCode.ToString()].Add(deviceId); + int groupId = GetGroupId(deviceId); + groups[groupId].Add(deviceId); } - return keyValuePairs; + + return groups; + } + + /// + /// 通过 deviceId 直接定位分组 + /// + public static List FindDeviceGroup(List[] groups, string deviceId) + { + int groupId = GetGroupId(deviceId); + return groups[groupId]; } } } diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 477008c..269a12f 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -24,6 +24,7 @@ using JiShe.CollectBus.IotSystems.MessageIssueds; using Confluent.Kafka; using MassTransit.SqlTransport.Topology; using Confluent.Kafka.Admin; +using JiShe.CollectBus.Common.Consts; namespace JiShe.CollectBus.Host @@ -370,12 +371,12 @@ namespace JiShe.CollectBus.Host { var adminClient = new AdminClientBuilder(new AdminClientConfig { - BootstrapServers = configuration.GetConnectionString("Kafka") + BootstrapServers = configuration.GetConnectionString(CommonConst.Kafka) }).Build(); try { - string serverTagName = configuration.GetSection("ServerTagName").Value!; + string serverTagName = configuration.GetSection(CommonConst.ServerTagName).Value!; List topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName));