Merge branch 'feature_定时抄读_05_CY' into dev
This commit is contained in:
commit
65a9481766
@ -103,6 +103,7 @@ public class CollectBusApplicationModule : AbpModule
|
|||||||
|
|
||||||
//默认初始化表计信息
|
//默认初始化表计信息
|
||||||
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
|
dbContext.InitWatermeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -225,9 +225,9 @@ namespace JiShe.CollectBus.Plugins
|
|||||||
//});
|
//});
|
||||||
|
|
||||||
|
|
||||||
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
|
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
|
||||||
|
|
||||||
await _producerBus.PublishAsync(topicName, new MessageReceived
|
await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
|
||||||
{
|
{
|
||||||
ClientId = client.Id,
|
ClientId = client.Id,
|
||||||
ClientIp = client.IP,
|
ClientIp = client.IP,
|
||||||
|
|||||||
@ -184,6 +184,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
focusAddressDataList.Add(item.Key);
|
||||||
|
|
||||||
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
|
var redisCacheKey = $"{string.Format(RedisConst.CacheMeterInfoKey, SystemType, ServerTagName, MeterTypeEnum.Ammeter, itemTimeDensity.Key)}{item.Key}";
|
||||||
|
|
||||||
#if DEBUG
|
#if DEBUG
|
||||||
@ -255,6 +257,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
|
await FreeRedisProvider.Instance.SetAsync(taskRedisCacheKey, nextTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//初始化设备组负载控制
|
||||||
|
if (focusAddressDataList == null || focusAddressDataList.Count <= 0)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(InitAmmeterCacheData)} 初始化设备组负载控制失败,没有找到对应的设备信息");
|
||||||
|
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList);
|
||||||
|
}
|
||||||
|
|
||||||
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成");
|
_logger.LogInformation($"{nameof(InitAmmeterCacheData)} 初始化电表缓存数据完成");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -4,6 +4,7 @@ using System.Threading.Tasks;
|
|||||||
using DotNetCore.CAP;
|
using DotNetCore.CAP;
|
||||||
using JiShe.CollectBus.Ammeters;
|
using JiShe.CollectBus.Ammeters;
|
||||||
using JiShe.CollectBus.Common.Consts;
|
using JiShe.CollectBus.Common.Consts;
|
||||||
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.FreeSql;
|
using JiShe.CollectBus.FreeSql;
|
||||||
using JiShe.CollectBus.GatherItem;
|
using JiShe.CollectBus.GatherItem;
|
||||||
using JiShe.CollectBus.IoTDBProvider;
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
@ -15,6 +16,7 @@ using JiShe.CollectBus.Repository;
|
|||||||
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
using JiShe.CollectBus.Repository.MeterReadingRecord;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using Microsoft.AspNetCore.Authorization;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
|
using Microsoft.AspNetCore.Mvc;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Volo.Abp.Domain.Repositories;
|
using Volo.Abp.Domain.Repositories;
|
||||||
@ -169,5 +171,27 @@ namespace JiShe.CollectBus.ScheduledMeterReading
|
|||||||
.Ado
|
.Ado
|
||||||
.QueryAsync<WatermeterInfo>(sql);
|
.QueryAsync<WatermeterInfo>(sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 测试设备分组均衡控制算法
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deviceCount"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
[HttpGet]
|
||||||
|
public async Task TestDeviceGroupBalanceControl(int deviceCount = 200000)
|
||||||
|
{
|
||||||
|
var deviceList = new List<string>();
|
||||||
|
for (int i = 0; i < deviceCount; i++)
|
||||||
|
{
|
||||||
|
deviceList.Add($"Device_{Guid.NewGuid()}");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// 打印分布统计
|
||||||
|
DeviceGroupBalanceControl.PrintDistributionStats();
|
||||||
|
|
||||||
|
await Task.CompletedTask;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -7,6 +7,7 @@ using JiShe.CollectBus.Common.Enums;
|
|||||||
using JiShe.CollectBus.Common.Extensions;
|
using JiShe.CollectBus.Common.Extensions;
|
||||||
using JiShe.CollectBus.Common.Helpers;
|
using JiShe.CollectBus.Common.Helpers;
|
||||||
using JiShe.CollectBus.Common.Models;
|
using JiShe.CollectBus.Common.Models;
|
||||||
|
using JiShe.CollectBus.IoTDBProvider;
|
||||||
using JiShe.CollectBus.IotSystems.Devices;
|
using JiShe.CollectBus.IotSystems.Devices;
|
||||||
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
using JiShe.CollectBus.IotSystems.MessageReceiveds;
|
||||||
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
|
||||||
@ -31,6 +32,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
|
private readonly IRepository<MessageReceived, Guid> _messageReceivedEventRepository;
|
||||||
private readonly IRepository<Device, Guid> _deviceRepository;
|
private readonly IRepository<Device, Guid> _deviceRepository;
|
||||||
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository;
|
||||||
|
private readonly IIoTDBProvider _dbProvider;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
|
/// Initializes a new instance of the <see cref="SubscriberAppService"/> class.
|
||||||
@ -48,7 +50,9 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
|
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
|
||||||
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
|
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository,
|
||||||
IRepository<MessageReceived, Guid> messageReceivedEventRepository,
|
IRepository<MessageReceived, Guid> messageReceivedEventRepository,
|
||||||
IRepository<Device, Guid> deviceRepository, IMeterReadingRecordRepository meterReadingRecordsRepository)
|
IRepository<Device, Guid> deviceRepository,
|
||||||
|
IIoTDBProvider dbProvider,
|
||||||
|
IMeterReadingRecordRepository meterReadingRecordsRepository)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_tcpService = tcpService;
|
_tcpService = tcpService;
|
||||||
@ -58,6 +62,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
_messageReceivedEventRepository = messageReceivedEventRepository;
|
_messageReceivedEventRepository = messageReceivedEventRepository;
|
||||||
_deviceRepository = deviceRepository;
|
_deviceRepository = deviceRepository;
|
||||||
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
_meterReadingRecordsRepository = meterReadingRecordsRepository;
|
||||||
|
_dbProvider = dbProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberLoginIssuedEventName)]
|
||||||
@ -119,6 +124,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
[CapSubscribe(ProtocolConst.SubscriberReceivedEventName)]
|
||||||
public async Task ReceivedEvent(MessageReceived receivedMessage)
|
public async Task ReceivedEvent(MessageReceived receivedMessage)
|
||||||
{
|
{
|
||||||
|
var currentTime = Clock.Now;
|
||||||
|
|
||||||
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>("StandardProtocolPlugin");
|
||||||
if (protocolPlugin == null)
|
if (protocolPlugin == null)
|
||||||
{
|
{
|
||||||
@ -126,6 +133,7 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
||||||
//todo 会根据不同的协议进行解析,然后做业务处理
|
//todo 会根据不同的协议进行解析,然后做业务处理
|
||||||
TB3761 fN = await protocolPlugin.AnalyzeAsync<TB3761>(receivedMessage);
|
TB3761 fN = await protocolPlugin.AnalyzeAsync<TB3761>(receivedMessage);
|
||||||
if(fN == null)
|
if(fN == null)
|
||||||
@ -140,11 +148,8 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//todo 查找是否有下发任务
|
//报文入库
|
||||||
|
var entity = new MeterReadingRecords()
|
||||||
|
|
||||||
|
|
||||||
await _meterReadingRecordsRepository.InsertAsync(new MeterReadingRecords()
|
|
||||||
{
|
{
|
||||||
ReceivedMessageHexString = receivedMessage.MessageHexString,
|
ReceivedMessageHexString = receivedMessage.MessageHexString,
|
||||||
AFN = fN.Afn,
|
AFN = fN.Afn,
|
||||||
@ -152,8 +157,18 @@ namespace JiShe.CollectBus.Subscribers
|
|||||||
Pn = 0,
|
Pn = 0,
|
||||||
FocusAddress = "",
|
FocusAddress = "",
|
||||||
MeterAddress = "",
|
MeterAddress = "",
|
||||||
//DataResult = tb3761FN.Text,
|
};
|
||||||
});
|
|
||||||
|
//如果没数据,则插入,有数据则更新
|
||||||
|
var updateEntity = await _meterReadingRecordsRepository.FirOrDefaultAsync(entity, currentTime);
|
||||||
|
if (updateEntity == null)
|
||||||
|
{
|
||||||
|
await _meterReadingRecordsRepository.InsertAsync(entity, currentTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
_dbProvider.InsertAsync();
|
||||||
|
//todo 查找是否有下发任务
|
||||||
|
|
||||||
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
//await _messageReceivedEventRepository.InsertAsync(receivedMessage);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -43,6 +43,11 @@ namespace JiShe.CollectBus.Common.Consts
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:";
|
public const string CacheTelemetryPacketInfoKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:TelemetryPacket:{"{2}"}:{"{3}"}:";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 缓存设备平衡关系映射结果,{0}=>系统类型,{1}=>应用服务部署标记
|
||||||
|
/// </summary>
|
||||||
|
public const string CacheDeviceBalanceRelationMapResultKey = $"{CacheBasicDirectoryKey}{"{0}:{1}"}:RelationMap";
|
||||||
|
|
||||||
public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey";
|
public const string CacheAmmeterFocusKey = "CacheAmmeterFocusKey";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -175,7 +175,11 @@ namespace JiShe.CollectBus.Common.Extensions
|
|||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public static string GetDataTableShardingStrategy(this DateTime dateTime)
|
public static string GetDataTableShardingStrategy(this DateTime dateTime)
|
||||||
{
|
{
|
||||||
|
#if DEBUG
|
||||||
return $"{dateTime:yyyyMMddHHmm}";
|
return $"{dateTime:yyyyMMddHHmm}";
|
||||||
|
#else
|
||||||
|
return $"{dateTime:yyyyMMddHH}";
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,8 +1,10 @@
|
|||||||
using System;
|
using JiShe.CollectBus.FreeRedisProvider;
|
||||||
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
using Volo.Abp.DependencyInjection;
|
||||||
|
|
||||||
namespace JiShe.CollectBus.Common.Helpers
|
namespace JiShe.CollectBus.Common.Helpers
|
||||||
{
|
{
|
||||||
@ -12,25 +14,29 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
public class DeviceGroupBalanceControl
|
public class DeviceGroupBalanceControl
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 设备组数量
|
/// 分组集合
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private const int GroupCount = 50;
|
|
||||||
private static List<string>[] _cachedGroups;
|
private static List<string>[] _cachedGroups;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 设备分组关系映射
|
||||||
|
/// </summary>
|
||||||
private static Dictionary<string, int> _balancedMapping;
|
private static Dictionary<string, int> _balancedMapping;
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 初始化缓存并强制均衡
|
/// 初始化缓存并强制均衡
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static void InitializeCache(List<string> deviceList)
|
public static void InitializeCache(List<string> deviceList,int groupCount = 50)
|
||||||
{
|
{
|
||||||
// 步骤1: 生成均衡映射表
|
// 步骤1: 生成均衡映射表
|
||||||
_balancedMapping = CreateBalancedMapping(deviceList, GroupCount);
|
_balancedMapping = CreateBalancedMapping(deviceList, groupCount);
|
||||||
|
|
||||||
// 步骤2: 根据映射表填充分组
|
// 步骤2: 根据映射表填充分组
|
||||||
_cachedGroups = new List<string>[GroupCount];
|
_cachedGroups = new List<string>[groupCount];
|
||||||
for (int i = 0; i < GroupCount; i++)
|
for (int i = 0; i < groupCount; i++)
|
||||||
{
|
{
|
||||||
_cachedGroups[i] = new List<string>(capacity: deviceList.Count / GroupCount + 1);
|
_cachedGroups[i] = new List<string>(capacity: deviceList.Count / groupCount + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach (var deviceId in deviceList)
|
foreach (var deviceId in deviceList)
|
||||||
@ -41,7 +47,7 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 通过 deviceId 获取分组
|
/// 通过 deviceId 获取所在的分组集合
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static List<string> GetGroup(string deviceId)
|
public static List<string> GetGroup(string deviceId)
|
||||||
{
|
{
|
||||||
@ -52,6 +58,17 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
return _cachedGroups[groupId];
|
return _cachedGroups[groupId];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 通过 deviceId 获取分组Id
|
||||||
|
/// </summary>
|
||||||
|
public static int GetDeviceGroupId(string deviceId)
|
||||||
|
{
|
||||||
|
if (_balancedMapping == null || _cachedGroups == null)
|
||||||
|
throw new InvalidOperationException("缓存未初始化");
|
||||||
|
|
||||||
|
return _balancedMapping[deviceId];
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 创建均衡映射表
|
/// 创建均衡映射表
|
||||||
@ -69,9 +86,6 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
// 初始化分组计数器
|
// 初始化分组计数器
|
||||||
int[] groupCounters = new int[groupCount];
|
int[] groupCounters = new int[groupCount];
|
||||||
|
|
||||||
// 随机数生成器用于平衡分配
|
|
||||||
Random rand = new Random();
|
|
||||||
|
|
||||||
foreach (var deviceId in deviceList)
|
foreach (var deviceId in deviceList)
|
||||||
{
|
{
|
||||||
int preferredGroup = GetGroupId(deviceId, groupCount);
|
int preferredGroup = GetGroupId(deviceId, groupCount);
|
||||||
|
|||||||
@ -30,6 +30,8 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
||||||
WriteIndented = false,
|
WriteIndented = false,
|
||||||
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
|
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
|
||||||
|
IgnoreReadOnlyFields = true,
|
||||||
|
IgnoreReadOnlyProperties = true,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,6 +69,8 @@ namespace JiShe.CollectBus.Common.Helpers
|
|||||||
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
|
||||||
WriteIndented = false,
|
WriteIndented = false,
|
||||||
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
|
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
|
||||||
|
IgnoreReadOnlyFields = true,
|
||||||
|
IgnoreReadOnlyProperties = true,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -25,6 +25,11 @@
|
|||||||
<PackageReference Include="Serilog" Version="4.1.0" />
|
<PackageReference Include="Serilog" Version="4.1.0" />
|
||||||
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Core" Version="8.3.3" />
|
||||||
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" />
|
<PackageReference Include="Volo.Abp.Ddd.Domain" Version="8.3.3" />
|
||||||
|
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@ -0,0 +1,15 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace JiShe.CollectBus.IotSystems.AFNEntity
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// AFN单项数据实体
|
||||||
|
/// </summary>
|
||||||
|
public class AFNDataEntity
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -376,10 +376,9 @@ namespace JiShe.CollectBus.Host
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
string serverTagName = configuration.GetSection(CommonConst.ServerTagName).Value!;
|
|
||||||
|
|
||||||
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued(serverTagName);
|
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByIssued();
|
||||||
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived(serverTagName));
|
topics.AddRange(ProtocolConstExtensions.GetAllTopicNamesByReceived());
|
||||||
|
|
||||||
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
|
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
|
||||||
foreach (var item in topics)
|
foreach (var item in topics)
|
||||||
|
|||||||
@ -35,12 +35,12 @@
|
|||||||
},
|
},
|
||||||
"ConnectionStrings": {
|
"ConnectionStrings": {
|
||||||
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
|
"Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000",
|
||||||
"Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092",
|
"Kafka": "192.168.0.151:29092,192.168.0.151:39092,192.168.0.151:49092",
|
||||||
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
|
"PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False",
|
||||||
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
|
"EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False"
|
||||||
},
|
},
|
||||||
"Redis": {
|
"Redis": {
|
||||||
"Configuration": "120.24.52.151:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
|
"Configuration": "192.168.0.151:6380,password=1q2w3e!@#,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true",
|
||||||
"DefaultDB": "14",
|
"DefaultDB": "14",
|
||||||
"HangfireDB": "15"
|
"HangfireDB": "15"
|
||||||
},
|
},
|
||||||
@ -86,12 +86,13 @@
|
|||||||
"SecurityProtocol": "SASL_PLAINTEXT",
|
"SecurityProtocol": "SASL_PLAINTEXT",
|
||||||
"SaslMechanism": "PLAIN",
|
"SaslMechanism": "PLAIN",
|
||||||
"SaslUserName": "lixiao",
|
"SaslUserName": "lixiao",
|
||||||
"SaslPassword": "lixiao1980"
|
"SaslPassword": "lixiao1980",
|
||||||
|
"NumPartitions": 50
|
||||||
},
|
},
|
||||||
"IoTDBOptions": {
|
"IoTDBOptions": {
|
||||||
"UserName": "root",
|
"UserName": "root",
|
||||||
"Password": "root",
|
"Password": "root",
|
||||||
"ClusterList": [ "192.168.56.102:6667" ],
|
"ClusterList": [ "192.168.0.151:6667" ],
|
||||||
"PoolSize": 2,
|
"PoolSize": 2,
|
||||||
"DataBaseName": "energy",
|
"DataBaseName": "energy",
|
||||||
"OpenDebugMode": true,
|
"OpenDebugMode": true,
|
||||||
|
|||||||
@ -103,8 +103,6 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
|||||||
{
|
{
|
||||||
var collection = await GetShardedCollection(entity.CreationTime);
|
var collection = await GetShardedCollection(entity.CreationTime);
|
||||||
|
|
||||||
var dbContext = await DbContextProvider.GetDbContextAsync();
|
|
||||||
|
|
||||||
await collection.UpdateOneAsync(filter, update);
|
await collection.UpdateOneAsync(filter, update);
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
@ -120,8 +118,8 @@ namespace JiShe.CollectBus.Repository.MeterReadingRecord
|
|||||||
public async Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime)
|
public async Task<MeterReadingRecords> FirOrDefaultAsync(MeterReadingRecords entity, DateTime? dateTime)
|
||||||
{
|
{
|
||||||
var collection = await GetShardedCollection(dateTime);
|
var collection = await GetShardedCollection(dateTime);
|
||||||
//await collection.findon
|
var query = await collection.FindAsync(d => d.CreationTime == dateTime.Value && d.AFN == entity.AFN && d.Fn == entity.Fn && d.FocusAddress == entity.FocusAddress);
|
||||||
throw new NotImplementedException();
|
return await query.FirstOrDefaultAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@ -15,7 +15,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
|||||||
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
|
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
|
||||||
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
|
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static List<string> GetAllTopicNamesByIssued(string serverTagName)
|
public static List<string> GetAllTopicNamesByIssued()
|
||||||
{
|
{
|
||||||
List<string> topics = typeof(ProtocolConst)
|
List<string> topics = typeof(ProtocolConst)
|
||||||
.GetFields(BindingFlags.Public | BindingFlags.Static)
|
.GetFields(BindingFlags.Public | BindingFlags.Static)
|
||||||
@ -24,7 +24,6 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
|||||||
!f.IsInitOnly &&
|
!f.IsInitOnly &&
|
||||||
f.FieldType == typeof(string) &&
|
f.FieldType == typeof(string) &&
|
||||||
f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段
|
f.Name.EndsWith("IssuedEventName")) // 通过命名规则过滤主题字段
|
||||||
//.Select(f => $"{serverTagName}.{(string)f.GetRawConstantValue()!}")
|
|
||||||
.Select(f => (string)f.GetRawConstantValue()!)
|
.Select(f => (string)f.GetRawConstantValue()!)
|
||||||
.ToList();
|
.ToList();
|
||||||
|
|
||||||
@ -35,7 +34,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
|||||||
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
|
/// 自动获取 ProtocolConst 类中所有下行 Kafka 主题名称
|
||||||
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
|
/// (通过反射筛选 public const string 且字段名以 "EventName" 结尾的常量)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static List<string> GetAllTopicNamesByReceived(string serverTagName)
|
public static List<string> GetAllTopicNamesByReceived()
|
||||||
{
|
{
|
||||||
//固定的上报主题
|
//固定的上报主题
|
||||||
var topicList = typeof(ProtocolConst)
|
var topicList = typeof(ProtocolConst)
|
||||||
@ -50,12 +49,19 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
|||||||
|
|
||||||
//动态上报主题,需根据协议的AFN功能码动态获取
|
//动态上报主题,需根据协议的AFN功能码动态获取
|
||||||
var afnList = EnumExtensions.ToNameValueDictionary<AFN>();
|
var afnList = EnumExtensions.ToNameValueDictionary<AFN>();
|
||||||
|
|
||||||
|
//需要排除的AFN功能码
|
||||||
|
var excludeItems = new List<int>() { 6, 7, 8,15 };
|
||||||
|
|
||||||
foreach (var item in afnList)
|
foreach (var item in afnList)
|
||||||
{
|
{
|
||||||
topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0')));
|
if (excludeItems.Contains(item.Value))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
//return topicList.Select(f => $"{serverTagName}.{f}").ToList();
|
topicList.Add(string.Format(ProtocolConst.AFNTopicNameFormat, item.Value.ToString().PadLeft(2, '0')));
|
||||||
|
}
|
||||||
|
|
||||||
return topicList;
|
return topicList;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,7 +19,7 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
|||||||
public const string SubscriberLoginIssuedEventName = "issued.login.event";
|
public const string SubscriberLoginIssuedEventName = "issued.login.event";
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 上行消息主题
|
/// 上行消息主题,测试使用
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string SubscriberReceivedEventName = "received.event";
|
public const string SubscriberReceivedEventName = "received.event";
|
||||||
|
|
||||||
@ -106,7 +106,71 @@ namespace JiShe.CollectBus.Protocol.Contracts
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// AFN上行主题格式
|
/// AFN上行主题格式
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public const string AFNTopicNameFormat = "received.afn{0}.event";
|
public const string AFNTopicNameFormat = "received.afn{0}h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN00H上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN00ReceivedEventNameTemp = "received.afn00h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN01H上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN00HReceivedEventNameTemp = "received.afn01h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN02H上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN01HReceivedEventNameTemp = "received.afn02h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN03H上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN02HReceivedEventNameTemp = "received.afn03h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN04H上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN04HReceivedEventNameTemp = "received.afn04h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN05H上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN05HReceivedEventNameTemp = "received.afn05h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN09H上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN09HReceivedEventNameTemp = "received.afn09h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN0AH上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN0AHReceivedEventNameTemp = "received.afn10h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN0BH上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN0BHReceivedEventNameTemp = "received.afn11h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN0CH上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN0CHReceivedEventNameTemp = "received.afn12h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN0DH上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN0DHReceivedEventNameTemp = "received.afn13h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN0EH上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN0EHReceivedEventNameTemp = "received.afn14h.event";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// AFN10H上行主题格式
|
||||||
|
/// </summary>
|
||||||
|
public const string SubscriberAFN10HReceivedEventNameTemp = "received.afn16h.event";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user