From 919f58d6c5ee4b65726a5fc44e1dc093970a185e Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Tue, 17 Mar 2026 14:29:52 +0800 Subject: [PATCH] =?UTF-8?q?OneNET=20=E6=95=B0=E6=8D=AE=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=87=8D=E6=96=B0=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IIoTPlatformAggregationService.cs | 9 ++++ .../IoTPlatformAggregationService.cs | 43 +++++++++++++++++++ .../IoTPlatformAggregationController.cs | 14 ++++++ 3 files changed, 66 insertions(+) diff --git a/src/JiShe.IoT.Application.Contracts/IoTPlatformAggregation/IIoTPlatformAggregationService.cs b/src/JiShe.IoT.Application.Contracts/IoTPlatformAggregation/IIoTPlatformAggregationService.cs index 2a9178d..2178bdc 100644 --- a/src/JiShe.IoT.Application.Contracts/IoTPlatformAggregation/IIoTPlatformAggregationService.cs +++ b/src/JiShe.IoT.Application.Contracts/IoTPlatformAggregation/IIoTPlatformAggregationService.cs @@ -1,5 +1,6 @@ using JiShe.IoT.IoTPlatformAggregation.Dto; using JiShe.ServicePro.Commons; +using JiShe.ServicePro.IoTDBManagement.TableModels; using System; using System.Collections.Generic; using System.Linq; @@ -45,5 +46,13 @@ namespace JiShe.IoT.IoTPlatformAggregation /// Task UpdateIoTPlatformProductThingModelInfoAsync(UpdateIoTPlatformProductThingModelInfoInput input ); + + /// + /// OneNET 数据日志处理重新处理 + /// + /// + /// + Task IoTPlatformDataLogReHandlerAsync(QueryOneNETReceiveMessageInput input + ); } } diff --git a/src/JiShe.IoT.Application/IoTPlatformAggregation/IoTPlatformAggregationService.cs b/src/JiShe.IoT.Application/IoTPlatformAggregation/IoTPlatformAggregationService.cs index 1c8ab35..f214b46 100644 --- a/src/JiShe.IoT.Application/IoTPlatformAggregation/IoTPlatformAggregationService.cs +++ b/src/JiShe.IoT.Application/IoTPlatformAggregation/IoTPlatformAggregationService.cs @@ -7,8 +7,12 @@ using JiShe.ServicePro.CTWingManagement.CTWingProducts; using JiShe.ServicePro.DeviceManagement.ThingModels; using JiShe.ServicePro.Enums; using JiShe.ServicePro.FreeRedisProvider; +using JiShe.ServicePro.IoTDBManagement.TableModels; +using JiShe.ServicePro.IoTDBManagement.TreeModels; +using JiShe.ServicePro.OneNET.Provider.ReceiveModels; using JiShe.ServicePro.OneNETManagement.OneNETAccounts; using JiShe.ServicePro.OneNETManagement.OneNETProducts; +using JiShe.ServicePro.OneNETManagement.Subscribers; using Microsoft.AspNetCore.DataProtection.KeyManagement; using Microsoft.Extensions.Logging; using System; @@ -31,6 +35,9 @@ namespace JiShe.IoT.IoTPlatformAggregation private readonly ICTWingAccountService _ctwingAccountService; private readonly IOneNETAccountService _oneNETAccountService; private readonly IIoTPlatformThingModelInfoAppService _ioTPlatformThingModelInfoAppService; + private readonly IOneNETSubscriberBasic _oneNETSubscriberBasicService; + private readonly ITableModelService _tableModelService; + public IoTPlatformAggregationService(ILogger logger, @@ -38,6 +45,8 @@ namespace JiShe.IoT.IoTPlatformAggregation IOneNETProductService oneNetProductService, ICTWingAccountService ctwingAccountService, IOneNETAccountService oneNETAccountService, + IOneNETSubscriberBasic oneNETSubscriberBasicService, + ITableModelService tableModelService, IIoTPlatformThingModelInfoAppService ioTPlatformThingModelInfoAppService) { _logger = logger; @@ -46,6 +55,8 @@ namespace JiShe.IoT.IoTPlatformAggregation _ctwingAccountService = ctwingAccountService; _oneNETAccountService = oneNETAccountService; _ioTPlatformThingModelInfoAppService = ioTPlatformThingModelInfoAppService; + _oneNETSubscriberBasicService = oneNETSubscriberBasicService; + _tableModelService = tableModelService; } /// @@ -349,5 +360,37 @@ namespace JiShe.IoT.IoTPlatformAggregation throw; } } + + + + /// + /// OneNET 数据日志处理重新处理 + /// + /// + /// + public async Task IoTPlatformDataLogReHandlerAsync(QueryOneNETReceiveMessageInput input) + { + try + { + var oneNETDataLoglist = await _tableModelService.OneNETLogInfoPageAsync(input); + if (oneNETDataLoglist == null || oneNETDataLoglist.Items == null || oneNETDataLoglist.Items.Count <= 0) + { + _logger.LogError($"{nameof(IoTPlatformDataLogReHandlerAsync)}OneNET数据日志处理重新处理失败,未获取到数据"); + return; + } + + foreach (var item in oneNETDataLoglist.Items) + { + string topicName = string.Equals(item.MessageType, IoTDBDataTypeConst.Data, StringComparison.OrdinalIgnoreCase) ? DistributedMessageCenterConst.OneNETThingModelPropertyChangeReceivedEventName : DistributedMessageCenterConst.OneNETThingModelEventChangeReceivedEventName; + var tempOneNETReceiveBasicModel = item.RawMessage.Deserialize(); + await _oneNETSubscriberBasicService.OneNETReceiveThingModelHandlerAsync(topicName, item.IoTDataType, tempOneNETReceiveBasicModel, true); + } + } + catch (Exception) + { + + throw; + } + } } } diff --git a/src/JiShe.IoT.HttpApi/Controllers/IoTPlatformAggregationController.cs b/src/JiShe.IoT.HttpApi/Controllers/IoTPlatformAggregationController.cs index 37fff38..47f7717 100644 --- a/src/JiShe.IoT.HttpApi/Controllers/IoTPlatformAggregationController.cs +++ b/src/JiShe.IoT.HttpApi/Controllers/IoTPlatformAggregationController.cs @@ -6,6 +6,7 @@ using JiShe.ServicePro; using JiShe.ServicePro.Commons; using JiShe.ServicePro.Core; using JiShe.ServicePro.DeviceManagement.DeviceInfos; +using JiShe.ServicePro.IoTDBManagement.TableModels; namespace JiShe.IoT.Controllers { @@ -69,5 +70,18 @@ namespace JiShe.IoT.Controllers return await _iotPlatformAggregationService.UpdateIoTPlatformProductThingModelInfoAsync(input); } + + /// + /// OneNET数据日志处理重新处理 + /// + /// + /// + [HttpPost(nameof(IoTPlatformDataLogReHandlerAsync))] + [SwaggerOperation(summary: "OneNET数据日志处理重新处理", Tags = new[] { "AggregationIoTPlatform" })] + public async Task IoTPlatformDataLogReHandlerAsync(QueryOneNETReceiveMessageInput input) + { + await _iotPlatformAggregationService.IoTPlatformDataLogReHandlerAsync(input); + } + } }