From 0d787c2e1bd61411d1104e59015f92ae66f8d387 Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Fri, 19 Sep 2025 11:36:17 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=8C=87=E4=BB=A4=E4=B8=8B?=
=?UTF-8?q?=E5=8F=91=E9=80=BB=E8=BE=91=E5=A4=84=E7=90=86=E3=80=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../iotdb/init/init-iot-db.sql | 2 +-
.../Dto/DeviceCommandForApiInput.cs | 10 ++++
.../BusinessSystemAggregationService.cs | 24 +++++++-
.../DeviceAggregationService.cs | 33 +++++++++--
src/JiShe.IoT.Application/IoTAppService.cs | 57 +++++++++++++++++++
src/JiShe.IoT.DbMigrator/appsettings.json | 10 ++--
6 files changed, 121 insertions(+), 15 deletions(-)
diff --git a/DockerComposeShells/iotdb/init/init-iot-db.sql b/DockerComposeShells/iotdb/init/init-iot-db.sql
index cfc9636..26db556 100644
--- a/DockerComposeShells/iotdb/init/init-iot-db.sql
+++ b/DockerComposeShells/iotdb/init/init-iot-db.sql
@@ -1,5 +1,5 @@
CREATE TABLE IF NOT EXISTS CTWingAepReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, PlatformTenantId STRING FIELD, ProductId STRING FIELD, ServiceId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD, IMSI STRING FIELD, IMEI STRING FIELD ) COMMENT 'CTWingAepReceiveMessageEntity';
-CREATE TABLE IF NOT EXISTS DeviceTelemetryPacketTaskInfo( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, IoTPlatformProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, IoTPlatformProductName STRING FIELD, IoTPlatformAccountId STRING FIELD, AccountPhoneNumber STRING FIELD, MessageType STRING FIELD, IssueRawMessage STRING FIELD, IssuePayload STRING FIELD, ResponseRawMessage STRING FIELD, ResponsePayload STRING FIELD, TelemetrySource INT32 FIELD, IoTPlatform INT32 FIELD ) COMMENT 'DeviceTelemetryPacketTaskInfo';
+CREATE TABLE IF NOT EXISTS DeviceTelemetryPacketTaskInfo( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, IoTPlatformProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, IoTPlatformProductName STRING FIELD, IoTPlatformAccountId STRING FIELD, AccountPhoneNumber STRING FIELD, TelemetryType INT32 FIELD, IssueRawMessage STRING FIELD, IssuePayload STRING FIELD, ResponseRawMessage STRING FIELD, ResponsePayload STRING FIELD, TelemetrySource INT32 FIELD, IoTPlatform INT32 FIELD, RetryCount INT32 FIELD, LastIssueTime TIMESTAMP FIELD, IssueStatus INT32 FIELD ) COMMENT 'DeviceTelemetryPacketTaskInfo';
CREATE TABLE IF NOT EXISTS OneNETReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, ProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, IsEncrypted BOOLEAN FIELD, PlaintextMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD ) COMMENT 'OneNETReceiveMessageEntity';
\ No newline at end of file
diff --git a/src/JiShe.IoT.Application.Contracts/DeviceAggregation/Dto/DeviceCommandForApiInput.cs b/src/JiShe.IoT.Application.Contracts/DeviceAggregation/Dto/DeviceCommandForApiInput.cs
index e77046d..a7f6e22 100644
--- a/src/JiShe.IoT.Application.Contracts/DeviceAggregation/Dto/DeviceCommandForApiInput.cs
+++ b/src/JiShe.IoT.Application.Contracts/DeviceAggregation/Dto/DeviceCommandForApiInput.cs
@@ -15,5 +15,15 @@ namespace JiShe.IoT.DeviceAggregation
///
[Required(ErrorMessage = "命令内容不能为空")]
public string CommandContent { get; set; }
+
+ ///
+ /// 设备类型
+ ///
+ public DeviceTypeEnum? DeviceType { get; set; }
+
+ ///
+ /// 设备指令类型
+ ///
+ public DeviceTelemetryCommandTypeEnum? TelemetryType { get; set; }
}
}
\ No newline at end of file
diff --git a/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs b/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs
index 16f22fc..7ee6879 100644
--- a/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs
+++ b/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs
@@ -1,10 +1,14 @@
using JiShe.ServicePro;
+using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Core;
+using JiShe.ServicePro.DataChannelManages;
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Encrypt;
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider;
+using JiShe.ServicePro.IoTDBManagement.DataChannels;
+using Mapster;
using Microsoft.Extensions.Options;
namespace JiShe.IoT.BusinessSystemAggregation
@@ -12,9 +16,10 @@ namespace JiShe.IoT.BusinessSystemAggregation
///
/// 业务系统聚合服务
///
- public class BusinessSystemAggregationService(IOptions options, IReliableRedisPubSubService redisPubSubService, IDeviceAppService deviceAppService) : IoTAppService, IBusinessSystemAggregationService
+ public class BusinessSystemAggregationService(IOptions options, IReliableRedisPubSubService redisPubSubService, IDeviceAppService deviceAppService, IIoTDBDataChannelManageService ioTDBDataChannelManageService, IOptions _ioTDBOptions) : IoTAppService, IBusinessSystemAggregationService
{
ServerApplicationOptions srverOptions = options.Value;
+ IoTDBOptions ioTDBOptions = _ioTDBOptions.Value;
///
/// 接收业务系统指令信息,缓存进Kafka
@@ -38,14 +43,16 @@ namespace JiShe.IoT.BusinessSystemAggregation
//判断是否需要解密
ReceiveCommandInfoDto messageBody = null;
+ string tempMessageBody = null;
if (srverOptions.IsAesEncrypted && !string.IsNullOrWhiteSpace(srverOptions.AesSecurityKey))
{
- string tempRaw = EncryptUtil.OpenApiDecrypto(input.Message, srverOptions.AesSecurityKey);
- messageBody = tempRaw.Deserialize();
+ tempMessageBody = EncryptUtil.OpenApiDecrypto(input.Message, srverOptions.AesSecurityKey);
+ messageBody = tempMessageBody.Deserialize();
}
else
{
messageBody = input.Message.Deserialize();
+ tempMessageBody = input.Message;
}
if (messageBody == null)
@@ -61,9 +68,20 @@ namespace JiShe.IoT.BusinessSystemAggregation
var deviceInfo = await deviceAppService.FindByDeviceAddressAsync(messageBody.DeviceAddress);
+ if (deviceInfo == null)
+ {
+ return HttpDataResultExtensions.Failed("设备不存在", -1041, ResponeResultEnum.Fail);
+ }
+
+ var packetTaskInfo = GetDeviceTelemetryPacketTaskInfo(ioTDBOptions, input, deviceInfo.Adapt(), tempMessageBody);
+
//将指令存储Kafka的OneNET主题中
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
{
+ //数据写入遥测任务数据存储通道
+ await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo));
+
+
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, input);
}
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
diff --git a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs
index 5715fab..1745f38 100644
--- a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs
+++ b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs
@@ -1,16 +1,21 @@
using JiShe.IoT.DeviceAggregation.Dto;
using JiShe.ServicePro;
+using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Core;
+using JiShe.ServicePro.DataChannelManages;
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.DeviceManagement.DeviceInfos.Dto;
using JiShe.ServicePro.DeviceManagement.Permissions;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider;
+using JiShe.ServicePro.IoTDBManagement.DataChannels;
+using JiShe.ServicePro.IoTDBManagement.TableModels;
using JiShe.ServicePro.OneNETManagement.OneNETDevices;
using JiShe.ServicePro.OneNETManagement.OneNETProducts;
using Mapster;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using Volo.Abp;
namespace JiShe.IoT.DeviceAggregation
@@ -22,8 +27,13 @@ namespace JiShe.IoT.DeviceAggregation
/// 设备服务
/// OneNET设备服务
/// Redis发布订阅服务
- public class DeviceAggregationService(ILogger logger, IDeviceAppService deviceAppService, IOneNETDeviceService oneNETDeviceService, IReliableRedisPubSubService redisPubSubService) : IoTAppService, IDeviceAggregationService
+ /// 数据通道
+ /// IoTDBOptions
+ public class DeviceAggregationService(ILogger logger, IDeviceAppService deviceAppService, IOneNETDeviceService oneNETDeviceService, IReliableRedisPubSubService redisPubSubService, IIoTDBDataChannelManageService ioTDBDataChannelManageService,IOptions _ioTDBOptions) : IoTAppService, IDeviceAggregationService
{
+ IoTDBOptions ioTDBOptions = _ioTDBOptions.Value;
+
+
///
/// 管理后台创建设备信息
///
@@ -279,6 +289,10 @@ namespace JiShe.IoT.DeviceAggregation
try
{
var deviceInfo = await deviceAppService.FindByIdAsync(input);
+ if (deviceInfo == null)
+ {
+ throw new UserFriendlyException($"设备不存在");
+ }
//将指令存储Kafka的OneNET主题中
var commandRequest = new OpenApiRequest()
{
@@ -286,14 +300,21 @@ namespace JiShe.IoT.DeviceAggregation
{
DeviceAddress = deviceInfo.DeviceAddress,
Commands = input.CommandContent.Deserialize>(),
- DeviceType = DeviceTypeEnum.Focus,//todo 设备类型 需要跟设备统一什么情况下知道具体设备类型
+ DeviceType = input.DeviceType ?? DeviceTypeEnum.Focus,//todo 设备类型 需要跟设备统一什么情况下知道具体设备类型
SourceType = DeviceTelemetrySourceTypeEnum.AdminSystem,
+ TelemetryType = input.TelemetryType ?? DeviceTelemetryCommandTypeEnum.抄读,
+ IoTPlatform = deviceInfo.IoTPlatform,
}.Serialize(),
};
+ var packetTaskInfo = GetDeviceTelemetryPacketTaskInfo(ioTDBOptions, commandRequest, deviceInfo.Adapt(), commandRequest.Message);
+
+ //数据写入遥测任务数据存储通道
+ await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo));
+
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
{
- return await DeviceCommandInfoToOneNET(deviceInfo, commandRequest);
+ return await DeviceCommandInfoToOneNET(deviceInfo, packetTaskInfo);
}
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
{
@@ -571,10 +592,10 @@ namespace JiShe.IoT.DeviceAggregation
/// 发送OneNET平台设备指令
///
///
- ///
+ ///
///
///
- public async Task DeviceCommandInfoToOneNET(DeviceManagementInfoDto deviceInfo, OpenApiRequest commandRequest)
+ public async Task DeviceCommandInfoToOneNET(DeviceManagementInfoDto deviceInfo, DeviceTelemetryPacketTaskInfo packetTaskInfo)
{
try
{
@@ -596,7 +617,7 @@ namespace JiShe.IoT.DeviceAggregation
throw new UserFriendlyException("设备不在线");
}
- await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, commandRequest);
+ await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo);
return true;
}
catch (Exception)
diff --git a/src/JiShe.IoT.Application/IoTAppService.cs b/src/JiShe.IoT.Application/IoTAppService.cs
index 65df7dd..420d724 100644
--- a/src/JiShe.IoT.Application/IoTAppService.cs
+++ b/src/JiShe.IoT.Application/IoTAppService.cs
@@ -1,5 +1,13 @@
+using JiShe.ServicePro;
+using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
+using JiShe.ServicePro.Consts;
+using JiShe.ServicePro.Core;
+using JiShe.ServicePro.Dto;
+using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.FreeSqlProvider;
+using JiShe.ServicePro.IoTDBManagement.TableModels;
+using Volo.Abp;
namespace JiShe.IoT
{
@@ -14,5 +22,54 @@ namespace JiShe.IoT
{
LocalizationResource = typeof(IoTResource);
}
+
+ ///
+ /// ȡ豸ңָϢ
+ ///
+ /// IoTDBOptions
+ /// ԭʼ
+ /// 豸Ϣ
+ /// Ϣ
+ ///
+ ///
+ protected DeviceTelemetryPacketTaskInfo GetDeviceTelemetryPacketTaskInfo(IoTDBOptions iotDBOptions, OpenApiRequest input, DeviceCacheInfos deviceInfo,string messageBody)
+ {
+ try
+ {
+ if (iotDBOptions == null || string.IsNullOrWhiteSpace(iotDBOptions.DataBaseName) || input == null || deviceInfo == null || string.IsNullOrWhiteSpace(messageBody))
+ {
+ throw new UserFriendlyException($"豸ңָʧܣϢ쳣");
+ }
+ //лϢݣõʵ
+ ReceiveCommandInfoDto commandIssueInfo = input.Message.Deserialize();
+
+ var oneNETIssueMessageEntity = new DeviceTelemetryPacketTaskInfo()
+ {
+ DataBaseName = iotDBOptions.DataBaseName,
+ DeviceType = $"{commandIssueInfo.DeviceType}",
+ DeviceAddress = commandIssueInfo.DeviceAddress,
+ IssueRawMessage = input.Serialize(),
+ IoTDataType = IoTDBDataTypeConst.Command,
+ TelemetryType = (int)commandIssueInfo.TelemetryType,
+ TelemetrySource = (int)commandIssueInfo.SourceType,
+ IoTPlatform = (int)commandIssueInfo.IoTPlatform,
+ IoTPlatformProductId = deviceInfo.IoTPlatformProductId,
+ IoTPlatformDeviceOpenInfo = deviceInfo.IoTPlatformDeviceOpenInfo,
+ IoTPlatformAccountId = deviceInfo.IoTPlatformAccountId,
+ AccountPhoneNumber = deviceInfo.AccountPhoneNumber,
+ IoTPlatformProductName = deviceInfo.IoTPlatformProductName,
+ IssuePayload = messageBody,
+ RetryCount = 0,
+ IssueStatus = (int)DeviceCommandIssueStatusEnum.Unprocessed
+ };
+
+ return oneNETIssueMessageEntity;
+ }
+ catch (Exception)
+ {
+
+ throw;
+ }
+ }
}
}
diff --git a/src/JiShe.IoT.DbMigrator/appsettings.json b/src/JiShe.IoT.DbMigrator/appsettings.json
index cadbfd0..5406a18 100644
--- a/src/JiShe.IoT.DbMigrator/appsettings.json
+++ b/src/JiShe.IoT.DbMigrator/appsettings.json
@@ -5,13 +5,13 @@
"IoTDBOptions": {
"UserName": "root",
"Password": "Lixiao@1980",
- //"TreeModelClusterList": [ "192.168.111.42:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
- //"TableModelClusterList": [ "192.168.111.42:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
- "TreeModelClusterList": [ "192.168.111.42:30710" ],
- "TableModelClusterList": [ "192.168.111.42:30710" ],
+ //"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
+ //"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
+ "TreeModelClusterList": [ "47.110.53.196:30710" ],
+ "TableModelClusterList": [ "47.110.53.196:30710" ],
"PoolSize": 32,
"DataBaseName": "jisheiotdata",
- "OpenDebugMode": true,
+ "OpenDebugMode": false,
"UseTableSessionPoolByDefault": false
},
"FreeRedisOptions": {