完善指令下发逻辑处理。

This commit is contained in:
ChenYi 2025-09-19 11:36:17 +08:00
parent b90a2cabc2
commit 0d787c2e1b
6 changed files with 121 additions and 15 deletions

View File

@ -1,5 +1,5 @@
CREATE TABLE IF NOT EXISTS CTWingAepReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, PlatformTenantId STRING FIELD, ProductId STRING FIELD, ServiceId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD, IMSI STRING FIELD, IMEI STRING FIELD ) COMMENT 'CTWingAepReceiveMessageEntity'; CREATE TABLE IF NOT EXISTS CTWingAepReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, PlatformTenantId STRING FIELD, ProductId STRING FIELD, ServiceId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD, IMSI STRING FIELD, IMEI STRING FIELD ) COMMENT 'CTWingAepReceiveMessageEntity';
CREATE TABLE IF NOT EXISTS DeviceTelemetryPacketTaskInfo( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, IoTPlatformProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, IoTPlatformProductName STRING FIELD, IoTPlatformAccountId STRING FIELD, AccountPhoneNumber STRING FIELD, MessageType STRING FIELD, IssueRawMessage STRING FIELD, IssuePayload STRING FIELD, ResponseRawMessage STRING FIELD, ResponsePayload STRING FIELD, TelemetrySource INT32 FIELD, IoTPlatform INT32 FIELD ) COMMENT 'DeviceTelemetryPacketTaskInfo'; CREATE TABLE IF NOT EXISTS DeviceTelemetryPacketTaskInfo( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, IoTPlatformProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, IoTPlatformProductName STRING FIELD, IoTPlatformAccountId STRING FIELD, AccountPhoneNumber STRING FIELD, TelemetryType INT32 FIELD, IssueRawMessage STRING FIELD, IssuePayload STRING FIELD, ResponseRawMessage STRING FIELD, ResponsePayload STRING FIELD, TelemetrySource INT32 FIELD, IoTPlatform INT32 FIELD, RetryCount INT32 FIELD, LastIssueTime TIMESTAMP FIELD, IssueStatus INT32 FIELD ) COMMENT 'DeviceTelemetryPacketTaskInfo';
CREATE TABLE IF NOT EXISTS OneNETReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, ProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, IsEncrypted BOOLEAN FIELD, PlaintextMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD ) COMMENT 'OneNETReceiveMessageEntity'; CREATE TABLE IF NOT EXISTS OneNETReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, ProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, IsEncrypted BOOLEAN FIELD, PlaintextMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD ) COMMENT 'OneNETReceiveMessageEntity';

View File

@ -15,5 +15,15 @@ namespace JiShe.IoT.DeviceAggregation
/// </summary> /// </summary>
[Required(ErrorMessage = "命令内容不能为空")] [Required(ErrorMessage = "命令内容不能为空")]
public string CommandContent { get; set; } public string CommandContent { get; set; }
/// <summary>
/// 设备类型
/// </summary>
public DeviceTypeEnum? DeviceType { get; set; }
/// <summary>
/// 设备指令类型
/// </summary>
public DeviceTelemetryCommandTypeEnum? TelemetryType { get; set; }
} }
} }

View File

@ -1,10 +1,14 @@
using JiShe.ServicePro; using JiShe.ServicePro;
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Core; using JiShe.ServicePro.Core;
using JiShe.ServicePro.DataChannelManages;
using JiShe.ServicePro.DeviceManagement.DeviceInfos; using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.Dto; using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Encrypt; using JiShe.ServicePro.Encrypt;
using JiShe.ServicePro.Enums; using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider; using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.IoTDBManagement.DataChannels;
using Mapster;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
namespace JiShe.IoT.BusinessSystemAggregation namespace JiShe.IoT.BusinessSystemAggregation
@ -12,9 +16,10 @@ namespace JiShe.IoT.BusinessSystemAggregation
/// <summary> /// <summary>
/// 业务系统聚合服务 /// 业务系统聚合服务
/// </summary> /// </summary>
public class BusinessSystemAggregationService(IOptions<ServerApplicationOptions> options, IReliableRedisPubSubService redisPubSubService, IDeviceAppService deviceAppService) : IoTAppService, IBusinessSystemAggregationService public class BusinessSystemAggregationService(IOptions<ServerApplicationOptions> options, IReliableRedisPubSubService redisPubSubService, IDeviceAppService deviceAppService, IIoTDBDataChannelManageService ioTDBDataChannelManageService, IOptions<IoTDBOptions> _ioTDBOptions) : IoTAppService, IBusinessSystemAggregationService
{ {
ServerApplicationOptions srverOptions = options.Value; ServerApplicationOptions srverOptions = options.Value;
IoTDBOptions ioTDBOptions = _ioTDBOptions.Value;
/// <summary> /// <summary>
/// 接收业务系统指令信息缓存进Kafka /// 接收业务系统指令信息缓存进Kafka
@ -38,14 +43,16 @@ namespace JiShe.IoT.BusinessSystemAggregation
//判断是否需要解密 //判断是否需要解密
ReceiveCommandInfoDto messageBody = null; ReceiveCommandInfoDto messageBody = null;
string tempMessageBody = null;
if (srverOptions.IsAesEncrypted && !string.IsNullOrWhiteSpace(srverOptions.AesSecurityKey)) if (srverOptions.IsAesEncrypted && !string.IsNullOrWhiteSpace(srverOptions.AesSecurityKey))
{ {
string tempRaw = EncryptUtil.OpenApiDecrypto(input.Message, srverOptions.AesSecurityKey); tempMessageBody = EncryptUtil.OpenApiDecrypto(input.Message, srverOptions.AesSecurityKey);
messageBody = tempRaw.Deserialize<ReceiveCommandInfoDto>(); messageBody = tempMessageBody.Deserialize<ReceiveCommandInfoDto>();
} }
else else
{ {
messageBody = input.Message.Deserialize<ReceiveCommandInfoDto>(); messageBody = input.Message.Deserialize<ReceiveCommandInfoDto>();
tempMessageBody = input.Message;
} }
if (messageBody == null) if (messageBody == null)
@ -61,9 +68,20 @@ namespace JiShe.IoT.BusinessSystemAggregation
var deviceInfo = await deviceAppService.FindByDeviceAddressAsync(messageBody.DeviceAddress); var deviceInfo = await deviceAppService.FindByDeviceAddressAsync(messageBody.DeviceAddress);
if (deviceInfo == null)
{
return HttpDataResultExtensions.Failed("设备不存在", -1041, ResponeResultEnum.Fail);
}
var packetTaskInfo = GetDeviceTelemetryPacketTaskInfo(ioTDBOptions, input, deviceInfo.Adapt<DeviceCacheInfos>(), tempMessageBody);
//将指令存储Kafka的OneNET主题中 //将指令存储Kafka的OneNET主题中
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET) if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
{ {
//数据写入遥测任务数据存储通道
await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo));
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, input); await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, input);
} }
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing) else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)

View File

@ -1,16 +1,21 @@
using JiShe.IoT.DeviceAggregation.Dto; using JiShe.IoT.DeviceAggregation.Dto;
using JiShe.ServicePro; using JiShe.ServicePro;
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Core; using JiShe.ServicePro.Core;
using JiShe.ServicePro.DataChannelManages;
using JiShe.ServicePro.DeviceManagement.DeviceInfos; using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.DeviceManagement.DeviceInfos.Dto; using JiShe.ServicePro.DeviceManagement.DeviceInfos.Dto;
using JiShe.ServicePro.DeviceManagement.Permissions; using JiShe.ServicePro.DeviceManagement.Permissions;
using JiShe.ServicePro.Dto; using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Enums; using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider; using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.IoTDBManagement.DataChannels;
using JiShe.ServicePro.IoTDBManagement.TableModels;
using JiShe.ServicePro.OneNETManagement.OneNETDevices; using JiShe.ServicePro.OneNETManagement.OneNETDevices;
using JiShe.ServicePro.OneNETManagement.OneNETProducts; using JiShe.ServicePro.OneNETManagement.OneNETProducts;
using Mapster; using Mapster;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp; using Volo.Abp;
namespace JiShe.IoT.DeviceAggregation namespace JiShe.IoT.DeviceAggregation
@ -22,8 +27,13 @@ namespace JiShe.IoT.DeviceAggregation
/// <param name="deviceAppService">设备服务</param> /// <param name="deviceAppService">设备服务</param>
/// <param name="oneNETDeviceService">OneNET设备服务</param> /// <param name="oneNETDeviceService">OneNET设备服务</param>
/// <param name="redisPubSubService">Redis发布订阅服务</param> /// <param name="redisPubSubService">Redis发布订阅服务</param>
public class DeviceAggregationService(ILogger<DeviceAggregationService> logger, IDeviceAppService deviceAppService, IOneNETDeviceService oneNETDeviceService, IReliableRedisPubSubService redisPubSubService) : IoTAppService, IDeviceAggregationService /// <param name="ioTDBDataChannelManageService">数据通道</param>
/// <param name="_ioTDBOptions">IoTDBOptions</param>
public class DeviceAggregationService(ILogger<DeviceAggregationService> logger, IDeviceAppService deviceAppService, IOneNETDeviceService oneNETDeviceService, IReliableRedisPubSubService redisPubSubService, IIoTDBDataChannelManageService ioTDBDataChannelManageService,IOptions<IoTDBOptions> _ioTDBOptions) : IoTAppService, IDeviceAggregationService
{ {
IoTDBOptions ioTDBOptions = _ioTDBOptions.Value;
/// <summary> /// <summary>
/// 管理后台创建设备信息 /// 管理后台创建设备信息
/// </summary> /// </summary>
@ -279,6 +289,10 @@ namespace JiShe.IoT.DeviceAggregation
try try
{ {
var deviceInfo = await deviceAppService.FindByIdAsync(input); var deviceInfo = await deviceAppService.FindByIdAsync(input);
if (deviceInfo == null)
{
throw new UserFriendlyException($"设备不存在");
}
//将指令存储Kafka的OneNET主题中 //将指令存储Kafka的OneNET主题中
var commandRequest = new OpenApiRequest() var commandRequest = new OpenApiRequest()
{ {
@ -286,14 +300,21 @@ namespace JiShe.IoT.DeviceAggregation
{ {
DeviceAddress = deviceInfo.DeviceAddress, DeviceAddress = deviceInfo.DeviceAddress,
Commands = input.CommandContent.Deserialize<Dictionary<string,object>>(), Commands = input.CommandContent.Deserialize<Dictionary<string,object>>(),
DeviceType = DeviceTypeEnum.Focus,//todo 设备类型 需要跟设备统一什么情况下知道具体设备类型 DeviceType = input.DeviceType ?? DeviceTypeEnum.Focus,//todo 设备类型 需要跟设备统一什么情况下知道具体设备类型
SourceType = DeviceTelemetrySourceTypeEnum.AdminSystem, SourceType = DeviceTelemetrySourceTypeEnum.AdminSystem,
TelemetryType = input.TelemetryType ?? DeviceTelemetryCommandTypeEnum.,
IoTPlatform = deviceInfo.IoTPlatform,
}.Serialize(), }.Serialize(),
}; };
var packetTaskInfo = GetDeviceTelemetryPacketTaskInfo(ioTDBOptions, commandRequest, deviceInfo.Adapt<DeviceCacheInfos>(), commandRequest.Message);
//数据写入遥测任务数据存储通道
await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo));
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET) if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
{ {
return await DeviceCommandInfoToOneNET(deviceInfo, commandRequest); return await DeviceCommandInfoToOneNET(deviceInfo, packetTaskInfo);
} }
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing) else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
{ {
@ -571,10 +592,10 @@ namespace JiShe.IoT.DeviceAggregation
/// 发送OneNET平台设备指令 /// 发送OneNET平台设备指令
/// </summary> /// </summary>
/// <param name="deviceInfo"></param> /// <param name="deviceInfo"></param>
/// <param name="commandRequest"></param> /// <param name="packetTaskInfo"></param>
/// <returns></returns> /// <returns></returns>
/// <exception cref="UserFriendlyException"></exception> /// <exception cref="UserFriendlyException"></exception>
public async Task<bool> DeviceCommandInfoToOneNET(DeviceManagementInfoDto deviceInfo, OpenApiRequest commandRequest) public async Task<bool> DeviceCommandInfoToOneNET(DeviceManagementInfoDto deviceInfo, DeviceTelemetryPacketTaskInfo packetTaskInfo)
{ {
try try
{ {
@ -596,7 +617,7 @@ namespace JiShe.IoT.DeviceAggregation
throw new UserFriendlyException("设备不在线"); throw new UserFriendlyException("设备不在线");
} }
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, commandRequest); await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo);
return true; return true;
} }
catch (Exception) catch (Exception)

View File

@ -1,5 +1,13 @@
using JiShe.ServicePro;
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Consts;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider; using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.FreeSqlProvider; using JiShe.ServicePro.FreeSqlProvider;
using JiShe.ServicePro.IoTDBManagement.TableModels;
using Volo.Abp;
namespace JiShe.IoT namespace JiShe.IoT
{ {
@ -14,5 +22,54 @@ namespace JiShe.IoT
{ {
LocalizationResource = typeof(IoTResource); LocalizationResource = typeof(IoTResource);
} }
/// <summary>
/// 获取设备遥测指令信息
/// </summary>
/// <param name="iotDBOptions">IoTDBOptions</param>
/// <param name="input">请求原始对象</param>
/// <param name="deviceInfo">设备信息</param>
/// <param name="messageBody">明文消息体</param>
/// <returns></returns>
/// <exception cref="UserFriendlyException"></exception>
protected DeviceTelemetryPacketTaskInfo GetDeviceTelemetryPacketTaskInfo(IoTDBOptions iotDBOptions, OpenApiRequest input, DeviceCacheInfos deviceInfo,string messageBody)
{
try
{
if (iotDBOptions == null || string.IsNullOrWhiteSpace(iotDBOptions.DataBaseName) || input == null || deviceInfo == null || string.IsNullOrWhiteSpace(messageBody))
{
throw new UserFriendlyException($"设备遥测指令创建失败,参数信息异常。");
}
//反序列化消息数据,得到数据实体
ReceiveCommandInfoDto commandIssueInfo = input.Message.Deserialize<ReceiveCommandInfoDto>();
var oneNETIssueMessageEntity = new DeviceTelemetryPacketTaskInfo()
{
DataBaseName = iotDBOptions.DataBaseName,
DeviceType = $"{commandIssueInfo.DeviceType}",
DeviceAddress = commandIssueInfo.DeviceAddress,
IssueRawMessage = input.Serialize(),
IoTDataType = IoTDBDataTypeConst.Command,
TelemetryType = (int)commandIssueInfo.TelemetryType,
TelemetrySource = (int)commandIssueInfo.SourceType,
IoTPlatform = (int)commandIssueInfo.IoTPlatform,
IoTPlatformProductId = deviceInfo.IoTPlatformProductId,
IoTPlatformDeviceOpenInfo = deviceInfo.IoTPlatformDeviceOpenInfo,
IoTPlatformAccountId = deviceInfo.IoTPlatformAccountId,
AccountPhoneNumber = deviceInfo.AccountPhoneNumber,
IoTPlatformProductName = deviceInfo.IoTPlatformProductName,
IssuePayload = messageBody,
RetryCount = 0,
IssueStatus = (int)DeviceCommandIssueStatusEnum.Unprocessed
};
return oneNETIssueMessageEntity;
}
catch (Exception)
{
throw;
}
}
} }
} }

View File

@ -5,13 +5,13 @@
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
"Password": "Lixiao@1980", "Password": "Lixiao@1980",
//"TreeModelClusterList": [ "192.168.111.42:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], //"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
//"TableModelClusterList": [ "192.168.111.42:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], //"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"TreeModelClusterList": [ "192.168.111.42:30710" ], "TreeModelClusterList": [ "47.110.53.196:30710" ],
"TableModelClusterList": [ "192.168.111.42:30710" ], "TableModelClusterList": [ "47.110.53.196:30710" ],
"PoolSize": 32, "PoolSize": 32,
"DataBaseName": "jisheiotdata", "DataBaseName": "jisheiotdata",
"OpenDebugMode": true, "OpenDebugMode": false,
"UseTableSessionPoolByDefault": false "UseTableSessionPoolByDefault": false
}, },
"FreeRedisOptions": { "FreeRedisOptions": {