实现OneNET设备重推、指令下发返回解析

This commit is contained in:
ChenYi 2025-08-05 17:09:39 +08:00
parent ff8fd898f6
commit 8f78a15cbc
5 changed files with 71 additions and 27 deletions

@ -1 +1 @@
Subproject commit 8df0a54f551a1f894014c9b0e8eca5d7c5c4f51b
Subproject commit 6a5258a04fbeec29f3dae96f375230204ab2047b

View File

@ -1,4 +1,5 @@
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.Enums;
using System.ComponentModel.DataAnnotations;
namespace JiShe.IoT.DeviceAggregation
@ -7,26 +8,8 @@ namespace JiShe.IoT.DeviceAggregation
/// <summary>
/// 设备命令
/// </summary>
public class DeviceCommandForApiInput
public class DeviceCommandForApiInput:IdInput
{
/// <summary>
/// 表通信地址
/// </summary>
[Required(ErrorMessage = "设备地址不能为空")]
public string DeviceAddress { get; set; }
/// <summary>
/// 物联网平台类型
/// </summary>
[Required(ErrorMessage = "物联网平台类型不能为空")]
public IoTPlatformTypeEnum IoTPlatform { get; set; }
/// <summary>
/// 设备在物联网平台中对应的产品Id
/// </summary>
[Required(ErrorMessage = "产品Id不能为空")]
public string IoTPlatformProductId { get; set; }
/// <summary>
/// 设备在物联网平台中发送的命令内容JSON格式
/// </summary>

View File

@ -267,6 +267,8 @@ namespace JiShe.IoT.DeviceAggregation
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, input.DeviceAddress, deviceCacheInfos);
return input.Adapt<DeviceManagementInfoDto>();
}
@ -279,13 +281,13 @@ namespace JiShe.IoT.DeviceAggregation
{
try
{
var deviceInfo = await deviceAppService.FindByDeviceAddressAsync(input.DeviceAddress);
var deviceInfo = await deviceAppService.FindByIdAsync(input);
//将指令存储Kafka的OneNET主题中
var commandRequest = new OpenApiRequest()
{
Message = new ReceiveCommandInfoDto()
{
DeviceAddress = input.DeviceAddress,
DeviceAddress = deviceInfo.DeviceAddress,
Commands = input.CommandContent.Deserialize<Dictionary<string,object>>(),
DeviceType = DeviceTypeEnum.Focus,//todo 设备类型 需要跟设备统一什么情况下知道具体设备类型
SourceType = DeviceTelemetrySourceTypeEnum.AdminSystem,
@ -294,8 +296,7 @@ namespace JiShe.IoT.DeviceAggregation
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
{
await producerService.ProduceAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, $"{GuidGenerator.Create()}", commandRequest);
return true;
return await DeviceCommandInfoToOneNET(deviceInfo, commandRequest);
}
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
{
@ -568,6 +569,45 @@ namespace JiShe.IoT.DeviceAggregation
throw;
}
}
/// <summary>
/// 发送OneNET平台设备指令
/// </summary>
/// <param name="deviceInfo"></param>
/// <param name="commandRequest"></param>
/// <returns></returns>
/// <exception cref="UserFriendlyException"></exception>
public async Task<bool> DeviceCommandInfoToOneNET(DeviceManagementInfoDto deviceInfo, OpenApiRequest commandRequest)
{
try
{
//检查下设备是否在线
var deviceOnlineStatus = await oneNETDeviceService.DeviceInfoDetailAsync(new DeviceInfoDetailInput()
{
DeviceName = deviceInfo.IoTPlatformDeviceOpenInfo,
OneNETAccountId = deviceInfo.IoTPlatformAccountId,
ProductId = deviceInfo.IoTPlatformProductId,
});
if (deviceOnlineStatus == null || deviceOnlineStatus.Code != ResponeResultEnum.Success)
{
throw new UserFriendlyException("获取平台设备信息失败");
}
if (deviceOnlineStatus.Data.Status != 1)
{
throw new UserFriendlyException("设备不在线");
}
await producerService.ProduceAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, $"{GuidGenerator.Create()}", commandRequest);
return true;
}
catch (Exception)
{
throw;
}
}
#endregion
#region CTWing

View File

@ -83,6 +83,13 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
return SubscribeAck.Fail();
}
//更新设备数据缓存到Redis
DeviceCacheInfos deviceCacheInfos = deviceEntity.Adapt<DeviceCacheInfos>();
deviceCacheInfos.IoTPlatformResponse = null;
deviceCacheInfos.PlatformPassword = null;
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
return SubscribeAck.Success();
}
catch (Exception)

View File

@ -66,11 +66,25 @@ namespace JiShe.IoT.Controllers
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost("RepushDeviceInfoToIoTPlatform")]
[HttpPost(nameof(RepushDeviceInfoToIoTPlatform))]
[SwaggerOperation(summary: "重新推送设备信息到物联网平台", Tags = new[] { "AggregationDevice" })]
public Task<DeviceManagementInfoDto> RepushDeviceInfoToIoTPlatform(IdInput input)
{
return _deviceAggregationService.RepushDeviceInfoToIoTPlatform(input);
}
/// <summary>
/// 发送设备指令信息
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[HttpPost(nameof(DeviceCommandForApiAsync))]
[SwaggerOperation(summary: "发送设备指令信息", Tags = new[] { "AggregationDevice" })]
public Task<bool> DeviceCommandForApiAsync(DeviceCommandForApiInput input)
{
return _deviceAggregationService.DeviceCommandForApiAsync(input);
}
}
}