Compare commits

..

No commits in common. "65a94817660799aebe687bce28bf11256abdb52d" and "691cc2e3fe823a090c6b14d7ca587f8dca8d447c" have entirely different histories.

16 changed files with 45 additions and 213 deletions

View File

@ -103,7 +103,6 @@ public class CollectBusApplicationModule : AbpModule
//默认初始化表计信息 //默认初始化表计信息
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
} }
} }

View File

@ -225,9 +225,9 @@ namespace JiShe.CollectBus.Plugins
//}); //});
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived await _producerBus.PublishAsync(topicName, new MessageReceived
{ {
ClientId = client.Id, ClientId = client.Id,
ClientIp = client.IP, ClientIp = client.IP,

View File

@ -184,8 +184,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
continue; continue;
} }
focusAddressDataList.Add(item.Key);
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}"; var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
#if DEBUG #if DEBUG
@ -257,17 +255,6 @@ namespace JiShe.CollectBus.ScheduledMeterReading
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask); await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
} }
//初始化设备组负载控制
if (focusAddressDataList == null || focusAddressDataList.Count <= 0)
{
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
}
else
{
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList);
}
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成"); _logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成");
} }
@ -345,7 +332,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
int timeDensity = 5; int timeDensity = 5;
var currentTime = DateTime.Now; var currentTime = DateTime.Now;
var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter); var redisKeyList = GetTelemetryPacketCacheKeyPrefix(timeDensity, MeterTypeEnum.Ammeter);
var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList); var fiveMinutekeyList = await FreeRedisProvider.Instance.KeysAsync(redisKeyList);
if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0) if (fiveMinutekeyList == null || fiveMinutekeyList.Length <= 0)
{ {

View File

@ -4,7 +4,6 @@ using System.Threading.Tasks;
using DotNetCore.CAP; using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.FreeSql;
using JiShe.CollectBus.GatherItem; using JiShe.CollectBus.GatherItem;
using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IoTDBProvider;
@ -16,7 +15,6 @@ using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord; using JiShe.CollectBus.Repository.MeterReadingRecord;
using MassTransit; using MassTransit;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Volo.Abp.Domain.Repositories; using Volo.Abp.Domain.Repositories;
@ -171,27 +169,5 @@ namespace JiShe.CollectBus.ScheduledMeterReading
.Ado .Ado
.QueryAsync<WatermeterInfo>(sql); .QueryAsync<WatermeterInfo>(sql);
} }
/// <summary>
/// 测试设备分组均衡控制算法
/// </summary>
/// <param name="deviceCount"></param>
/// <returns></returns>
[HttpGet]
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
{
var deviceList = new List<string>();
for (int i = 0; i < deviceCount; i++)
{
deviceList.Add($"Device_{Guid.NewGuid()}");
}
// 打印分布统计
DeviceGroupBalanceControl.PrintDistributionStats();
await Task.CompletedTask;
}
} }
} }

View File

@ -7,7 +7,6 @@ using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Common.Models;
using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.MeterReadingRecords;
@ -32,7 +31,6 @@ namespace JiShe.CollectBus.Subscribers
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository; private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
private readonly IRepository<Device, Guid> _deviceRepository; private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
private readonly IIoTDBProvider _dbProvider;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class. /// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
@ -50,9 +48,7 @@ namespace JiShe.CollectBus.Subscribers
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository, IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository, IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
IRepository<MessageReceived, Guid> messageReceivedEventRepository, IRepository<MessageReceived, Guid> messageReceivedEventRepository,
IRepository<Device, Guid> deviceRepository, IRepository<Device, Guid> deviceRepository, IMeterReadingRecordRepository meterReadingRecordsRepository)
IIoTDBProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordsRepository)
{ {
_logger = logger; _logger = logger;
_tcpService = tcpService; _tcpService = tcpService;
@ -62,7 +58,6 @@ namespace JiShe.CollectBus.Subscribers
_messageReceivedEventRepository = messageReceivedEventRepository; _messageReceivedEventRepository = messageReceivedEventRepository;
_deviceRepository = deviceRepository; _deviceRepository = deviceRepository;
_meterReadingRecordsRepository = meterReadingRecordsRepository; _meterReadingRecordsRepository = meterReadingRecordsRepository;
_dbProvider = dbProvider;
} }
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)] [CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
@ -124,8 +119,6 @@ namespace JiShe.CollectBus.Subscribers
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)] [CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
public async Task ReceivedEvent(MessageReceived receivedMessage) public async Task ReceivedEvent(MessageReceived receivedMessage)
{ {
var currentTime = Clock.Now;
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin"); var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
if (protocolPlugin == null) if (protocolPlugin == null)
{ {
@ -133,7 +126,6 @@ namespace JiShe.CollectBus.Subscribers
} }
else else
{ {
//todo 会根据不同的协议进行解析,然后做业务处理 //todo 会根据不同的协议进行解析,然后做业务处理
TB3761 fN = await protocolPlugin.AnalyzeAsync<TB3761>(receivedMessage); TB3761 fN = await protocolPlugin.AnalyzeAsync<TB3761>(receivedMessage);
if(fN == null) if(fN == null)
@ -148,8 +140,11 @@ namespace JiShe.CollectBus.Subscribers
return; return;
} }
//报文入库 //todo 查找是否有下发任务
var entity = new MeterReadingRecords()
await _meterReadingRecordsRepository.InsertAsync(new MeterReadingRecords()
{ {
ReceivedMessageHexString = receivedMessage.MessageHexString, ReceivedMessageHexString = receivedMessage.MessageHexString,
AFN = fN.Afn, AFN = fN.Afn,
@ -157,18 +152,8 @@ namespace JiShe.CollectBus.Subscribers
Pn = 0, Pn = 0,
FocusAddress = "", FocusAddress = "",
MeterAddress = "", MeterAddress = "",
}; //DataResult = tb3761FN.Text,
});
//如果没数据,则插入,有数据则更新
var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime);
if (updateEntity == null)
{
await _meterReadingRecordsRepository.InsertAsync(entity, currentTime);
}
_dbProvider.InsertAsync();
//todo 查找是否有下发任务
//await _messageReceivedEventRepository.InsertAsync(receivedMessage); //await _messageReceivedEventRepository.InsertAsync(receivedMessage);
} }

View File

@ -43,11 +43,6 @@ namespace JiShe.CollectBus.Common.Consts
/// </summary> /// </summary>
public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:"; public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:";
/// <summary>
/// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
/// </summary>
public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap";
public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey"; public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey";
} }
} }

View File

@ -175,11 +175,7 @@ namespace JiShe.CollectBus.Common.Extensions
/// <returns></returns> /// <returns></returns>
public static string GetDataTableShardingStrategy(this DateTime dateTime) public static string GetDataTableShardingStrategy(this DateTime dateTime)
{ {
#if DEBUG
return $"{dateTime:yyyyMMddHHmm}"; return $"{dateTime:yyyyMMddHHmm}";
#else
return $"{dateTime:yyyyMMddHH}";
#endif
} }
} }
} }

View File

@ -1,10 +1,8 @@
using JiShe.CollectBus.FreeRedisProvider; using System;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace JiShe.CollectBus.Common.Helpers namespace JiShe.CollectBus.Common.Helpers
{ {
@ -12,42 +10,38 @@ namespace JiShe.CollectBus.Common.Helpers
/// 设备组负载控制 /// 设备组负载控制
/// </summary> /// </summary>
public class DeviceGroupBalanceControl public class DeviceGroupBalanceControl
{ {
/// <summary> /// <summary>
/// 分组集合 /// 设备组数量
/// </summary> /// </summary>
private const int GroupCount = 50;
private static List<string>[] _cachedGroups; private static List<string>[] _cachedGroups;
/// <summary>
/// 设备分组关系映射
/// </summary>
private static Dictionary<string, int> _balancedMapping; private static Dictionary<string, int> _balancedMapping;
/// <summary> /// <summary>
/// 初始化缓存并强制均衡 /// 初始化缓存并强制均衡
/// </summary> /// </summary>
public static void InitializeCache(List<string> deviceList,int groupCount = 50) public static void InitializeCache(List<string> deviceList)
{ {
// 步骤1: 生成均衡映射表 // 步骤1: 生成均衡映射表
_balancedMapping = CreateBalancedMapping(deviceList, groupCount); _balancedMapping = CreateBalancedMapping(deviceList, GroupCount);
// 步骤2: 根据映射表填充分组 // 步骤2: 根据映射表填充分组
_cachedGroups = new List<string>[groupCount]; _cachedGroups = new List<string>[GroupCount];
for (int i = 0; i < groupCount; i++) for (int i = 0; i < GroupCount; i++)
{ {
_cachedGroups[i] = new List<string>(capacity: deviceList.Count / groupCount + 1); _cachedGroups[i] = new List<string>(capacity: deviceList.Count / GroupCount + 1);
} }
foreach (var deviceId in deviceList) foreach (var deviceId in deviceList)
{ {
int groupId = _balancedMapping[deviceId]; int groupId = _balancedMapping[deviceId];
_cachedGroups[groupId].Add(deviceId); _cachedGroups[groupId].Add(deviceId);
} }
} }
/// <summary> /// <summary>
/// 通过 deviceId 获取所在的分组集合 /// 通过 deviceId 获取分组
/// </summary> /// </summary>
public static List<string> GetGroup(string deviceId) public static List<string> GetGroup(string deviceId)
{ {
@ -58,17 +52,6 @@ namespace JiShe.CollectBus.Common.Helpers
return _cachedGroups[groupId]; return _cachedGroups[groupId];
} }
/// <summary>
/// 通过 deviceId 获取分组Id
/// </summary>
public static int GetDeviceGroupId(string deviceId)
{
if (_balancedMapping == null || _cachedGroups == null)
throw new InvalidOperationException("缓存未初始化");
return _balancedMapping[deviceId];
}
/// <summary> /// <summary>
/// 创建均衡映射表 /// 创建均衡映射表
@ -86,6 +69,9 @@ namespace JiShe.CollectBus.Common.Helpers
// 初始化分组计数器 // 初始化分组计数器
int[] groupCounters = new int[groupCount]; int[] groupCounters = new int[groupCount];
// 随机数生成器用于平衡分配
Random rand = new Random();
foreach (var deviceId in deviceList) foreach (var deviceId in deviceList)
{ {
int preferredGroup = GetGroupId(deviceId, groupCount); int preferredGroup = GetGroupId(deviceId, groupCount);

View File

@ -30,8 +30,6 @@ namespace JiShe.CollectBus.Common.Helpers
DefaultIgnoreCondition = JsonIgnoreCondition.Never, DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false, WriteIndented = false,
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping, Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
IgnoreReadOnlyFields = true,
IgnoreReadOnlyProperties = true,
}; };
} }
@ -69,8 +67,6 @@ namespace JiShe.CollectBus.Common.Helpers
DefaultIgnoreCondition = JsonIgnoreCondition.Never, DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false, WriteIndented = false,
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping, Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
IgnoreReadOnlyFields = true,
IgnoreReadOnlyProperties = true,
}; };
} }

View File

@ -25,11 +25,6 @@
<PackageReference Include="Serilog" Version="4.1.0" /> <PackageReference Include="Serilog" Version="4.1.0" />
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" /> <PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -1,15 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.IotSystems.AFNEntity
{
/// <summary>
/// AFN单项数据实体
/// </summary>
public class AFNDataEntity
{
}
}

View File

@ -375,10 +375,11 @@ namespace JiShe.CollectBus.Host
}).Build(); }).Build();
try try
{ {
string serverTagName = configuration.GetSection(CommonConst.ServerTagName).Value!;
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(); List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName);
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived()); topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName));
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>(); List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
foreach (var item in topics) foreach (var item in topics)

View File

@ -35,12 +35,12 @@
}, },
"ConnectionStrings": { "ConnectionStrings": {
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
"Kafka": "192.168.0.151:29092,192.168.0.151:39092,192.168.0.151:49092", "Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
}, },
"Redis": { "Redis": {
"Configuration": "192.168.0.151:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "Configuration": "120.24.52.151:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
"DefaultDB": "14", "DefaultDB": "14",
"HangfireDB": "15" "HangfireDB": "15"
}, },
@ -86,13 +86,12 @@
"SecurityProtocol": "SASL_PLAINTEXT", "SecurityProtocol": "SASL_PLAINTEXT",
"SaslMechanism": "PLAIN", "SaslMechanism": "PLAIN",
"SaslUserName": "lixiao", "SaslUserName": "lixiao",
"SaslPassword": "lixiao1980", "SaslPassword": "lixiao1980"
"NumPartitions": 50
}, },
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
"Password": "root", "Password": "root",
"ClusterList": [ "192.168.0.151:6667" ], "ClusterList": [ "192.168.56.102:6667" ],
"PoolSize": 2, "PoolSize": 2,
"DataBaseName": "energy", "DataBaseName": "energy",
"OpenDebugMode": true, "OpenDebugMode": true,

View File

@ -88,7 +88,7 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
public async Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime) public async Task<MeterReadingRecords> InsertAsync(MeterReadingRecords entity, DateTime? dateTime)
{ {
var collection = await GetShardedCollection(dateTime); var collection = await GetShardedCollection(dateTime);
await collection.InsertOneAsync(entity); await collection.InsertOneAsync(entity);
return entity; return entity;
} }
@ -103,6 +103,8 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
{ {
var collection = await GetShardedCollection(entity.CreationTime); var collection = await GetShardedCollection(entity.CreationTime);
var dbContext = await DbContextProvider.GetDbContextAsync();
await collection.UpdateOneAsync(filter, update); await collection.UpdateOneAsync(filter, update);
return entity; return entity;
} }
@ -118,8 +120,8 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
public async Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime) public async Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime)
{ {
var collection = await GetShardedCollection(dateTime); var collection = await GetShardedCollection(dateTime);
var query = await collection.FindAsync(d => d.CreationTime == dateTime.Value && d.AFN == entity.AFN && d.Fn == entity.Fn && d.FocusAddress == entity.FocusAddress); //await collection.findon
return await query.FirstOrDefaultAsync(); throw new NotImplementedException();
} }
/// <summary> /// <summary>

View File

@ -15,7 +15,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称 /// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量) /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
/// </summary> /// </summary>
public static List<string> GetAllTopicNamesByIssued() public static List<string> GetAllTopicNamesByIssued(string serverTagName)
{ {
List<string> topics = typeof(ProtocolConst) List<string> topics = typeof(ProtocolConst)
.GetFields(BindingFlags.Public | BindingFlags.Static) .GetFields(BindingFlags.Public | BindingFlags.Static)
@ -24,6 +24,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
!f.IsInitOnly && !f.IsInitOnly &&
f.FieldType == typeof(string) && f.FieldType == typeof(string) &&
f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段 f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段
//.Select(f => $"{serverTagName}.{(string)f.GetRawConstantValue()!}")
.Select(f => (string)f.GetRawConstantValue()!) .Select(f => (string)f.GetRawConstantValue()!)
.ToList(); .ToList();
@ -34,7 +35,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称 /// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量) /// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
/// </summary> /// </summary>
public static List<string> GetAllTopicNamesByReceived() public static List<string> GetAllTopicNamesByReceived(string serverTagName)
{ {
//固定的上报主题 //固定的上报主题
var topicList = typeof(ProtocolConst) var topicList = typeof(ProtocolConst)
@ -49,20 +50,13 @@ namespace JiShe.CollectBus.Protocol.Contracts
//动态上报主题需根据协议的AFN功能码动态获取 //动态上报主题需根据协议的AFN功能码动态获取
var afnList = EnumExtensions.ToNameValueDictionary<AFN>(); var afnList = EnumExtensions.ToNameValueDictionary<AFN>();
//需要排除的AFN功能码
var excludeItems = new List<int>() { 6, 7, 8,15 };
foreach (var item in afnList) foreach (var item in afnList)
{ {
if (excludeItems.Contains(item.Value))
{
continue;
}
topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0'))); topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0')));
} }
//return topicList.Select(f => $"{serverTagName}.{f}").ToList();
return topicList; return topicList;
} }
} }

View File

@ -19,7 +19,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
public const string SubscriberLoginIssuedEventName = "issued.login.event"; public const string SubscriberLoginIssuedEventName = "issued.login.event";
/// <summary> /// <summary>
/// 上行消息主题,测试使用 /// 上行消息主题
/// </summary> /// </summary>
public const string SubscriberReceivedEventName = "received.event"; public const string SubscriberReceivedEventName = "received.event";
@ -106,71 +106,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
/// <summary> /// <summary>
/// AFN上行主题格式 /// AFN上行主题格式
/// </summary> /// </summary>
public const string AFNTopicNameFormat = "received.afn{0}h.event"; public const string AFNTopicNameFormat = "received.afn{0}.event";
/// <summary>
/// AFN00H上行主题格式
/// </summary>
public const string SubscriberAFN00ReceivedEventNameTemp = "received.afn00h.event";
/// <summary>
/// AFN01H上行主题格式
/// </summary>
public const string SubscriberAFN00HReceivedEventNameTemp = "received.afn01h.event";
/// <summary>
/// AFN02H上行主题格式
/// </summary>
public const string SubscriberAFN01HReceivedEventNameTemp = "received.afn02h.event";
/// <summary>
/// AFN03H上行主题格式
/// </summary>
public const string SubscriberAFN02HReceivedEventNameTemp = "received.afn03h.event";
/// <summary>
/// AFN04H上行主题格式
/// </summary>
public const string SubscriberAFN04HReceivedEventNameTemp = "received.afn04h.event";
/// <summary>
/// AFN05H上行主题格式
/// </summary>
public const string SubscriberAFN05HReceivedEventNameTemp = "received.afn05h.event";
/// <summary>
/// AFN09H上行主题格式
/// </summary>
public const string SubscriberAFN09HReceivedEventNameTemp = "received.afn09h.event";
/// <summary>
/// AFN0AH上行主题格式
/// </summary>
public const string SubscriberAFN0AHReceivedEventNameTemp = "received.afn10h.event";
/// <summary>
/// AFN0BH上行主题格式
/// </summary>
public const string SubscriberAFN0BHReceivedEventNameTemp = "received.afn11h.event";
/// <summary>
/// AFN0CH上行主题格式
/// </summary>
public const string SubscriberAFN0CHReceivedEventNameTemp = "received.afn12h.event";
/// <summary>
/// AFN0DH上行主题格式
/// </summary>
public const string SubscriberAFN0DHReceivedEventNameTemp = "received.afn13h.event";
/// <summary>
/// AFN0EH上行主题格式
/// </summary>
public const string SubscriberAFN0EHReceivedEventNameTemp = "received.afn14h.event";
/// <summary>
/// AFN10H上行主题格式
/// </summary>
public const string SubscriberAFN10HReceivedEventNameTemp = "received.afn16h.event";
} }
} }