业务系统设备批量数据查询接口
This commit is contained in:
parent
0d787c2e1b
commit
3cc29ee4c7
@ -0,0 +1,40 @@
|
|||||||
|
using JiShe.ServicePro.Enums;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace JiShe.IoT.BusinessSystemAggregation.Dto
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 批量查询设备信息输入
|
||||||
|
/// </summary>
|
||||||
|
public class BatchQueryDeviceInfoInput
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 设备类型
|
||||||
|
/// </summary>
|
||||||
|
public DeviceTypeEnum DeviceType { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 数据类型
|
||||||
|
/// </summary>
|
||||||
|
public string IoTDataType { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 开始时间,最终需要转换为纳秒级时间戳
|
||||||
|
/// </summary>
|
||||||
|
public DateTime BeginTime { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 结束时间,最终需要转换为纳秒级时间戳
|
||||||
|
/// </summary>
|
||||||
|
public DateTime EndTime { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 设备地址集合
|
||||||
|
/// </summary>
|
||||||
|
public List<string> DeviceAddresses { get; set; }
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,4 +1,5 @@
|
|||||||
using JiShe.ServicePro;
|
using JiShe.ServicePro;
|
||||||
|
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
@ -13,10 +14,17 @@ namespace JiShe.IoT.BusinessSystemAggregation
|
|||||||
public interface IBusinessSystemAggregationService : IApplicationService
|
public interface IBusinessSystemAggregationService : IApplicationService
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 接收业务系统指令信息
|
/// 接收业务系统指令信息,Msg 字段为 ReceiveCommandInfoDto 实体
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="input"></param>
|
/// <param name="input"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<HttpDataResult> ReceiveCommandInfoAsync(OpenApiRequest input);
|
Task<HttpDataResult> ReceiveCommandInfoAsync(OpenApiRequest input);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 业务系统批量查询设备数据,Msg 字段为 BatchQueryDeviceInfoInput 实体
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="input"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
Task<HttpDataResult<List<IoTDBDynamicObject>>> BatchQueryDeviceInfoAsync(OpenApiRequest input);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -43,8 +43,4 @@
|
|||||||
<PackageReference Include="JiShe.ServicePro.OneNETManagement.Application.Contracts" />
|
<PackageReference Include="JiShe.ServicePro.OneNETManagement.Application.Contracts" />
|
||||||
|
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
|
||||||
<Folder Include="BusinessSystemAggregation\Dto\" />
|
|
||||||
</ItemGroup>
|
|
||||||
</Project>
|
</Project>
|
||||||
@ -1,4 +1,6 @@
|
|||||||
using JiShe.ServicePro;
|
using JiShe.IoT.BusinessSystemAggregation.Dto;
|
||||||
|
using JiShe.ServicePro;
|
||||||
|
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
|
||||||
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
|
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
|
||||||
using JiShe.ServicePro.Core;
|
using JiShe.ServicePro.Core;
|
||||||
using JiShe.ServicePro.DataChannelManages;
|
using JiShe.ServicePro.DataChannelManages;
|
||||||
@ -8,21 +10,24 @@ using JiShe.ServicePro.Encrypt;
|
|||||||
using JiShe.ServicePro.Enums;
|
using JiShe.ServicePro.Enums;
|
||||||
using JiShe.ServicePro.FreeRedisProvider;
|
using JiShe.ServicePro.FreeRedisProvider;
|
||||||
using JiShe.ServicePro.IoTDBManagement.DataChannels;
|
using JiShe.ServicePro.IoTDBManagement.DataChannels;
|
||||||
|
using JiShe.ServicePro.IoTDBManagement.TreeModels;
|
||||||
using Mapster;
|
using Mapster;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
namespace JiShe.IoT.BusinessSystemAggregation
|
namespace JiShe.IoT.BusinessSystemAggregation
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 业务系统聚合服务
|
/// 业务系统聚合服务
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class BusinessSystemAggregationService(IOptions<ServerApplicationOptions> options, IReliableRedisPubSubService redisPubSubService, IDeviceAppService deviceAppService, IIoTDBDataChannelManageService ioTDBDataChannelManageService, IOptions<IoTDBOptions> _ioTDBOptions) : IoTAppService, IBusinessSystemAggregationService
|
public class BusinessSystemAggregationService(IOptions<ServerApplicationOptions> options, IReliableRedisPubSubService redisPubSubService, IDeviceAppService deviceAppService, IIoTDBDataChannelManageService ioTDBDataChannelManageService, IOptions<IoTDBOptions> _ioTDBOptions, ITreeModelService treeModelService, ILogger<BusinessSystemAggregationService> _logger) : IoTAppService, IBusinessSystemAggregationService
|
||||||
{
|
{
|
||||||
ServerApplicationOptions srverOptions = options.Value;
|
ServerApplicationOptions srverOptions = options.Value;
|
||||||
IoTDBOptions ioTDBOptions = _ioTDBOptions.Value;
|
IoTDBOptions ioTDBOptions = _ioTDBOptions.Value;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 接收业务系统指令信息,缓存进Kafka
|
/// 接收业务系统指令信息
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[AllowAnonymous]
|
[AllowAnonymous]
|
||||||
public async Task<HttpDataResult> ReceiveCommandInfoAsync(OpenApiRequest input)
|
public async Task<HttpDataResult> ReceiveCommandInfoAsync(OpenApiRequest input)
|
||||||
@ -30,35 +35,44 @@ namespace JiShe.IoT.BusinessSystemAggregation
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.SignatureToken);
|
|
||||||
if (verifySignatureReult == false)//签名校验失败
|
|
||||||
{
|
|
||||||
return HttpDataResultExtensions.Failed("签名校验失败", -101, ResponeResultEnum.NotAllowed);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(input.Message))
|
var handleResult = HandleOpenApiRequest<ReceiveCommandInfoDto>(input);
|
||||||
|
if (handleResult.Success == false)
|
||||||
{
|
{
|
||||||
return HttpDataResultExtensions.Failed("指令下发内容不能为空", -102, ResponeResultEnum.Fail);
|
return handleResult;
|
||||||
}
|
}
|
||||||
|
var messageBody = handleResult.Data;
|
||||||
//判断是否需要解密
|
|
||||||
ReceiveCommandInfoDto messageBody = null;
|
|
||||||
string tempMessageBody = null;
|
string tempMessageBody = null;
|
||||||
if (srverOptions.IsAesEncrypted && !string.IsNullOrWhiteSpace(srverOptions.AesSecurityKey))
|
|
||||||
{
|
|
||||||
tempMessageBody = EncryptUtil.OpenApiDecrypto(input.Message, srverOptions.AesSecurityKey);
|
|
||||||
messageBody = tempMessageBody.Deserialize<ReceiveCommandInfoDto>();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
messageBody = input.Message.Deserialize<ReceiveCommandInfoDto>();
|
|
||||||
tempMessageBody = input.Message;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (messageBody == null)
|
//bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.SignatureToken);
|
||||||
{
|
//if (verifySignatureReult == false)//签名校验失败
|
||||||
return HttpDataResultExtensions.Failed("指令下发内容不能为空", -103, ResponeResultEnum.Fail);
|
//{
|
||||||
}
|
// return HttpDataResultExtensions.Failed("签名校验失败", -101, ResponeResultEnum.NotAllowed);
|
||||||
|
//}
|
||||||
|
|
||||||
|
//if (string.IsNullOrWhiteSpace(input.Message))
|
||||||
|
//{
|
||||||
|
// return HttpDataResultExtensions.Failed("指令下发内容不能为空", -102, ResponeResultEnum.Fail);
|
||||||
|
//}
|
||||||
|
|
||||||
|
////判断是否需要解密
|
||||||
|
//ReceiveCommandInfoDto messageBody = null;
|
||||||
|
//string tempMessageBody = null;
|
||||||
|
//if (srverOptions.IsAesEncrypted && !string.IsNullOrWhiteSpace(srverOptions.AesSecurityKey))
|
||||||
|
//{
|
||||||
|
// tempMessageBody = EncryptUtil.OpenApiDecrypto(input.Message, srverOptions.AesSecurityKey);
|
||||||
|
// messageBody = tempMessageBody.Deserialize<ReceiveCommandInfoDto>();
|
||||||
|
//}
|
||||||
|
//else
|
||||||
|
//{
|
||||||
|
// tempMessageBody = input.Message;
|
||||||
|
// messageBody = input.Message.Deserialize<ReceiveCommandInfoDto>();
|
||||||
|
//}
|
||||||
|
|
||||||
|
//if (messageBody == null)
|
||||||
|
//{
|
||||||
|
// return HttpDataResultExtensions.Failed("指令下发内容不能为空", -103, ResponeResultEnum.Fail);
|
||||||
|
//}
|
||||||
|
|
||||||
//限定来源类型必须为业务系统
|
//限定来源类型必须为业务系统
|
||||||
if (messageBody.SourceType != DeviceTelemetrySourceTypeEnum.BusinessSystem)
|
if (messageBody.SourceType != DeviceTelemetrySourceTypeEnum.BusinessSystem)
|
||||||
@ -100,5 +114,140 @@ namespace JiShe.IoT.BusinessSystemAggregation
|
|||||||
return HttpDataResultExtensions.Failed($"指令处理失败,发送异常:{ex.Message}", -106);
|
return HttpDataResultExtensions.Failed($"指令处理失败,发送异常:{ex.Message}", -106);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 业务系统批量查询设备数据,Msg 字段为BatchQueryDeviceInfoInput实体
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="input"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
[AllowAnonymous]
|
||||||
|
public async Task<HttpDataResult<List<IoTDBDynamicObject>>> BatchQueryDeviceInfoAsync(OpenApiRequest input)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var handleResult = HandleOpenApiRequest<BatchQueryDeviceInfoInput>(input);
|
||||||
|
if (handleResult.Success == false)
|
||||||
|
{
|
||||||
|
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, handleResult.Message, handleResult.LocationCode);
|
||||||
|
}
|
||||||
|
var messageBody = handleResult.Data;
|
||||||
|
|
||||||
|
if (messageBody.DeviceAddresses == null || messageBody.DeviceAddresses.Count <= 0)
|
||||||
|
{
|
||||||
|
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "设备地址不能为空", -103);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lua脚本
|
||||||
|
string luaScript = @"
|
||||||
|
local hashKey = KEYS[1]
|
||||||
|
local fieldKeys = ARGV
|
||||||
|
return redis.call('HMGET', hashKey, unpack(fieldKeys))";
|
||||||
|
|
||||||
|
//执行脚本
|
||||||
|
var result = await FreeRedisProvider.Instance.EvalAsync
|
||||||
|
(
|
||||||
|
luaScript,
|
||||||
|
new[] { RedisConst.CacheAllDeviceInfoHashKey },
|
||||||
|
messageBody.DeviceAddresses.ToArray()
|
||||||
|
);
|
||||||
|
|
||||||
|
List<DeviceCacheInfos> deviceCacheInfos = new List<DeviceCacheInfos>();
|
||||||
|
|
||||||
|
// 处理返回结果
|
||||||
|
if (result is object[] values)
|
||||||
|
{
|
||||||
|
foreach (var value in values)
|
||||||
|
{
|
||||||
|
var tempFocusInfo = ServiceProJsonSerializer.Deserialize<DeviceCacheInfos>(value as string);
|
||||||
|
deviceCacheInfos.Add(tempFocusInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<IoTDBDynamicObject> queryResult = new List<IoTDBDynamicObject>();
|
||||||
|
foreach (var deviceAddress in messageBody.DeviceAddresses)
|
||||||
|
{
|
||||||
|
var deviceCacheInfo = deviceCacheInfos.FirstOrDefault(x => x.DeviceAddress == deviceAddress);
|
||||||
|
|
||||||
|
if (deviceCacheInfo == null)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(BatchQueryDeviceInfoAsync)} 业务系统批量查询设备数据,设备地址:{deviceAddress}未找到设备地址缓存信息,消息体为:{input.Serialize()}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
var pageResult = await treeModelService.DeviceDataInfoPageAsync(new DeviceTreeModelDataInfoInput()
|
||||||
|
{
|
||||||
|
DeviceAddress = deviceAddress,
|
||||||
|
DeviceType = messageBody.DeviceType,
|
||||||
|
IoTDataType = messageBody.IoTDataType,
|
||||||
|
IsNeedPaging = false,
|
||||||
|
});
|
||||||
|
|
||||||
|
//todo 根据业务系统时间间隔要求进行过滤
|
||||||
|
if (pageResult.Items != null && pageResult.Items.Count > 0)
|
||||||
|
{
|
||||||
|
queryResult.AddRange(pageResult.Items);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return HttpDataResultExtensions.Success(queryResult,"查询成功");
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
|
||||||
|
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, $"查询设备数据失败,发送异常:{ex.Message}", -106);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 处理开放接口请求
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T"></typeparam>
|
||||||
|
/// <param name="input"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
private HttpDataResult<T> HandleOpenApiRequest<T>(OpenApiRequest input) where T : class
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(input.Message) || string.IsNullOrWhiteSpace(input.Message) || string.IsNullOrWhiteSpace(input.Signature))
|
||||||
|
{
|
||||||
|
return HttpDataResultExtensions.Failed<T>(null, "请求参数不能为空", -1101);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (input.Timestamp <= 946656000000)//时间戳小于2000年,视为错误
|
||||||
|
{
|
||||||
|
return HttpDataResultExtensions.Failed<T>(null, "时间戳异常", -1102);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool verifySignatureReult = EncryptUtil.OpenApiVerifySignature(input.Message, input.Nonce, input.Timestamp, input.Signature, srverOptions.SignatureToken);
|
||||||
|
if (verifySignatureReult == false)//签名校验失败
|
||||||
|
{
|
||||||
|
return HttpDataResultExtensions.Failed<T>(null, "签名校验失败", -1103);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//判断是否需要解密
|
||||||
|
T messageBody = default;
|
||||||
|
string tempMessageBody = null;
|
||||||
|
if (srverOptions.IsAesEncrypted && !string.IsNullOrWhiteSpace(srverOptions.AesSecurityKey))
|
||||||
|
{
|
||||||
|
tempMessageBody = EncryptUtil.OpenApiDecrypto(input.Message, srverOptions.AesSecurityKey);
|
||||||
|
messageBody = tempMessageBody.Deserialize<T>();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tempMessageBody = input.Message;
|
||||||
|
messageBody = input.Message.Deserialize<T>();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (messageBody == null)
|
||||||
|
{
|
||||||
|
return HttpDataResultExtensions.Failed<T>(null, "获取数据体失败", -1104);
|
||||||
|
}
|
||||||
|
|
||||||
|
return HttpDataResultExtensions.Success(messageBody, tempMessageBody);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -272,7 +272,7 @@ namespace JiShe.IoT.DeviceAggregation
|
|||||||
deviceCacheInfos.IoTPlatformResponse = null;
|
deviceCacheInfos.IoTPlatformResponse = null;
|
||||||
deviceCacheInfos.PlatformPassword = null;
|
deviceCacheInfos.PlatformPassword = null;
|
||||||
|
|
||||||
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, input.DeviceAddress, deviceCacheInfos);
|
FreeRedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, input.DeviceAddress, deviceCacheInfos);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -16,7 +16,7 @@ namespace JiShe.IoT
|
|||||||
public abstract class IoTAppService : ApplicationService
|
public abstract class IoTAppService : ApplicationService
|
||||||
{
|
{
|
||||||
protected IFreeSqlProvider FreeSqlDbContext => LazyServiceProvider.LazyGetRequiredService<IFreeSqlProvider>();
|
protected IFreeSqlProvider FreeSqlDbContext => LazyServiceProvider.LazyGetRequiredService<IFreeSqlProvider>();
|
||||||
protected IFreeRedisProvider RedisProvider => LazyServiceProvider.LazyGetRequiredService<IFreeRedisProvider>();
|
protected IFreeRedisProvider FreeRedisProvider => LazyServiceProvider.LazyGetRequiredService<IFreeRedisProvider>();
|
||||||
|
|
||||||
protected IoTAppService()
|
protected IoTAppService()
|
||||||
{
|
{
|
||||||
|
|||||||
@ -41,7 +41,7 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
|||||||
{
|
{
|
||||||
// 为订阅回调创建独立的 FreeSql 客户端
|
// 为订阅回调创建独立的 FreeSql 客户端
|
||||||
var callbackFreeSqlDbContext = FreeSqlDbContext;
|
var callbackFreeSqlDbContext = FreeSqlDbContext;
|
||||||
var callbackFreeSql = RedisProvider;
|
var callbackFreeSql = FreeRedisProvider;
|
||||||
|
|
||||||
|
|
||||||
// 订阅频道
|
// 订阅频道
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user