From af9b0d8a77b08857014ae1ba419b27208eabf555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E7=9B=8A?= Date: Mon, 24 Mar 2025 21:55:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=A1=A8=E5=88=86=E7=89=87?= =?UTF-8?q?=E7=AD=96=E7=95=A5=E7=BB=9F=E4=B8=80=E5=87=BA=E5=8F=A3=E5=B0=81?= =?UTF-8?q?=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Subscribers/SubscriberAppService.cs | 53 +++++++++++++++---- .../Extensions/DateTimeExtensions.cs | 10 ++++ .../ShardingStrategy/DayShardingStrategy.cs | 9 ++-- .../Abstracts/BaseProtocolPlugin.cs | 2 +- .../Interfaces/IProtocolPlugin.cs | 2 +- 5 files changed, 59 insertions(+), 17 deletions(-) diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index fc05bc7..7dbe586 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -1,7 +1,10 @@ using System; +using System.Linq; using System.Threading.Tasks; using DotNetCore.CAP; using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; @@ -9,6 +12,7 @@ using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; +using JiShe.CollectBus.Repository.MeterReadingRecord; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; @@ -16,16 +20,16 @@ using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Subscribers { - public class SubscriberAppService : CollectBusAppService, ISubscriberAppService,ICapSubscribe + public class SubscriberAppService : CollectBusAppService, ISubscriberAppService, ICapSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; - private readonly IRepository _messageReceivedEventRepository; + private readonly IRepository _messageReceivedEventRepository; private readonly IRepository _deviceRepository; - private readonly IRepository _meterReadingRecordsRepository; + private readonly IMeterReadingRecordRepository _meterReadingRecordsRepository; /// /// Initializes a new instance of the class. @@ -38,12 +42,12 @@ namespace JiShe.CollectBus.Subscribers /// The message received event repository. /// The device repository. /// The device repository. - public SubscriberAppService(ILogger logger, - ITcpService tcpService, IServiceProvider serviceProvider, - IRepository messageReceivedLoginEventRepository, - IRepository messageReceivedHeartbeatEventRepository, - IRepository messageReceivedEventRepository, - IRepository deviceRepository, IRepository meterReadingRecordsRepository) + public SubscriberAppService(ILogger logger, + ITcpService tcpService, IServiceProvider serviceProvider, + IRepository messageReceivedLoginEventRepository, + IRepository messageReceivedHeartbeatEventRepository, + IRepository messageReceivedEventRepository, + IRepository deviceRepository, IMeterReadingRecordRepository meterReadingRecordsRepository) { _logger = logger; _tcpService = tcpService; @@ -79,7 +83,7 @@ namespace JiShe.CollectBus.Subscribers throw new ArgumentOutOfRangeException(); } var device = await _deviceRepository.FindAsync(a => a.Number == issuedEventMessage.DeviceNo); - if (device!=null) + if (device != null) { await _tcpService.SendAsync(device.ClientId, issuedEventMessage.Message); } @@ -96,7 +100,34 @@ namespace JiShe.CollectBus.Subscribers else { //todo 会根据不同的协议进行解析,然后做业务处理 - TB3761FN fN = await protocolPlugin.AnalyzeAsync(receivedMessage); + TB3761 fN = await protocolPlugin.AnalyzeAsync(receivedMessage); + if(fN == null) + { + Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + return; + } + var tb3761FN = fN.FnList.FirstOrDefault(); + if (tb3761FN == null) + { + Logger.LogError($"数据解析失败:{receivedMessage.Serialize()}"); + return; + } + + //todo 查找是否有下发任务 + + + + await _meterReadingRecordsRepository.InsertAsync(new MeterReadingRecords() + { + ReceivedMessageHexString = receivedMessage.MessageHexString, + AFN = fN.Afn, + Fn = tb3761FN.Fn, + Pn = 0, + FocusAddress = "", + MeterAddress = "", + DataResult = tb3761FN.Text, + }); + //await _messageReceivedEventRepository.InsertAsync(receivedMessage); } } diff --git a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs index b12778f..904d4b9 100644 --- a/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs +++ b/src/JiShe.CollectBus.Common/Extensions/DateTimeExtensions.cs @@ -167,5 +167,15 @@ namespace JiShe.CollectBus.Common.Extensions ) ); } + + /// + /// 获取数据表分片策略 + /// + /// + /// + public static string GetDataTableShardingStrategy(this DateTime dateTime) + { + return $"{dateTime:yyyyMMddHHmm}"; + } } } diff --git a/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs b/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs index 75157e5..f26136d 100644 --- a/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs +++ b/src/JiShe.CollectBus.MongoDB/ShardingStrategy/DayShardingStrategy.cs @@ -1,4 +1,5 @@ -using System; +using JiShe.CollectBus.Common.Extensions; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -21,7 +22,7 @@ namespace JiShe.CollectBus.ShardingStrategy public string GetCollectionName(DateTime dateTime) { var baseName = typeof(TEntity).Name; - return $"{baseName}_{dateTime:yyyyMMddHHmm}"; + return $"{baseName}_{dateTime.GetDataTableShardingStrategy()}"; } /// @@ -31,7 +32,7 @@ namespace JiShe.CollectBus.ShardingStrategy public string GetCurrentCollectionName() { var baseName = typeof(TEntity).Name; - return $"{baseName}_{DateTime.Now:yyyyMMddHHmm}"; + return $"{baseName}_{DateTime.Now.GetDataTableShardingStrategy()}"; } /// @@ -49,7 +50,7 @@ namespace JiShe.CollectBus.ShardingStrategy while (current <= end) { - months.Add($"{baseName}_{current:yyyyMMddHHmm}"); + months.Add($"{baseName}_{current.GetDataTableShardingStrategy()}"); current = current.AddMonths(1); } diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 609dcdb..4638983 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -55,7 +55,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Abstracts //await _protocolInfoCache.Get() } - public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761FN; + public abstract Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761; /// /// 登录帧解析 diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs index 5ad92c1..2f48cd2 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Interfaces/IProtocolPlugin.cs @@ -14,7 +14,7 @@ namespace JiShe.CollectBus.Protocol.Contracts.Interfaces Task AddAsync(); - Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761FN; + Task AnalyzeAsync(MessageReceived messageReceived, Action? sendAction = null) where T : TB3761; Task LoginAsync(MessageReceivedLogin messageReceived);