554 lines
27 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.IoT.BusinessSystemAggregation.Dto;
using JiShe.IoT.DeviceAggregation;
using JiShe.ServicePro;
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.DataChannelManages;
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.DeviceManagement.ThingModels;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.IoTDBManagement.DataChannels;
using JiShe.ServicePro.IoTDBManagement.TreeModels;
using JiShe.ServicePro.OneNETManagement.OneNETDevices;
using JiShe.ServicePro.OneNETManagement.OneNETProducts;
using Mapster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp;
using Volo.Abp.Auditing;
using static Volo.Abp.Ui.LayoutHooks.LayoutHooks;
namespace JiShe.IoT.BusinessSystemAggregation
{
/// <summary>
/// 业务系统聚合服务
/// <param name="_logger"></param>
/// <param name="_deviceAppService">设备服务</param>
/// <param name="_oneNETDeviceService">OneNET设备服务</param>
/// <param name="_redisPubSubService">Redis发布订阅服务</param>
/// <param name="_ioTDBDataChannelManageService">数据通道</param>
/// <param name="_ioTDBOptions">IoTDBOptions</param>
/// <param name="_oneNETProductService">OneNET产品服务</param>
/// <param name="_deviceThingModelService">设备端物模型服务</param>
/// <param name="_platformThingModelInfoAppService">平台端端物模型服务</param>
/// <param name="_deviceUpgradeRecordService">设备升级记录服务</param>
/// <param name="_serverOptions">应用服务配置</param>
/// <param name="_treeModelService">IoTDB树模型服务</param>
/// </summary>
[DisableAuditing]
public class BusinessSystemAggregationService(
IDeviceAppService _deviceAppService,
IOneNETDeviceService _oneNETDeviceService,
IReliableRedisPubSubService _redisPubSubService,
IIoTDBDataChannelManageService _ioTDBDataChannelManageService,
IOptions<IoTDBOptions> _ioTDBOptions,
IOptions<ServerApplicationOptions> _serverOptions,
IOneNETProductService _oneNETProductService,
IDeviceThingModelManagementAppService _deviceThingModelService,
IIoTPlatformThingModelInfoAppService _platformThingModelInfoAppService,
IDeviceUpgradeRecordService _deviceUpgradeRecordService,
ITreeModelService _treeModelService,
ILogger<BusinessSystemAggregationService> _logger)
: IoTDeviceBasicAppService(_logger,
_deviceAppService,
_oneNETDeviceService,
_redisPubSubService,
_ioTDBDataChannelManageService,
_ioTDBOptions,
_serverOptions,
_oneNETProductService,
_deviceThingModelService,
_platformThingModelInfoAppService,
_deviceUpgradeRecordService,
_treeModelService), IBusinessSystemAggregationService
{
private const string LUA_SCRIPT = @"
local hashKey = KEYS[1]
local fieldKeys = ARGV
return redis.call('HMGET', hashKey, unpack(fieldKeys))";
/// <summary>
/// 接收业务系统设置指令信息
/// </summary>
[AllowAnonymous]
public async Task<HttpDataResult> ReceiveSetCommandInfoAsync(OpenApiRequest input)
{
try
{
var handleResult = HandleOpenApiRequest<ReceiveCommandInfoDto>(input, serverApplicationOptions);
if (handleResult.Success == false)
{
return handleResult;
}
var messageBody = handleResult.Data;
if (messageBody == null || messageBody.Commands == null || messageBody.Commands.Count <= 0)
{
return HttpDataResultExtensions.Failed("设备指令不能为空", -103, ResponeResultEnum.Fail);
}
//限定来源类型必须为业务系统
if (messageBody.SourceType != DeviceTelemetrySourceTypeEnum.BusinessSystem)
{
return HttpDataResultExtensions.Failed("设备指令来源类型错误业务系统传固定值2", -104, ResponeResultEnum.Fail);
}
var deviceInfo = await deviceAppService.FindByDeviceAddressAsync(messageBody.DeviceAddress);
if (deviceInfo == null)
{
return HttpDataResultExtensions.Failed("设备不存在", -1041, ResponeResultEnum.Fail);
}
var packetTaskInfo = GetDeviceTelemetryPacketTaskInfo(ioTDBOptions, input, deviceInfo.Adapt<DeviceCacheInfos>(), messageBody.Commands.Serialize());
//将指令存储IoTDB数据库和Redis发布通道
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
{
//获取设备对应的平台端物模型信息,校验前端传入的属性标识集合是否存在不合法的属性标识符
var platformThingModelInfo = await platformThingModelInfoAppService.FindByPlatformProductIdAsync(new IdInput<string>() { Id = deviceInfo.IoTPlatformProductId });
if (platformThingModelInfo == null)
{
throw new UserFriendlyException($"业务系统推送指令时设备{deviceInfo.DeviceAddress}的平台端物模型信息不存在。");
}
foreach (var item in messageBody.Commands)
{
var tempPlatformThingModelInfo = platformThingModelInfo.Where(d => d.IoTPlatformRawFieldName == item.Key).FirstOrDefault();
if (tempPlatformThingModelInfo == null)
{
throw new UserFriendlyException($"业务系统推送指令时设备设备{deviceInfo.DeviceAddress}平台端物模型信息不存在属性标识符{item.Key}。");
}
//排除升级指令
if (tempPlatformThingModelInfo.StandardFieldName.ToLowerInvariant() == ThingModelFixedTypeConst.FIRMWARE_UPGRADE.ToLowerInvariant())
{
throw new UserFriendlyException($"业务系统推送指令时设备{deviceInfo.DeviceAddress}平台端物模型属性标识符{item.Key}是升级指令操作,被禁止。");
}
if (deviceInfo.IsNeedConfigDeviceModel && deviceInfo.DeviceThingModelDataId.HasValue && item.Key.ToLowerInvariant() == ThingModelFixedTypeConst.SpecialCommand.ToLowerInvariant())
{
throw new UserFriendlyException($"业务系统推送指令时设备{deviceInfo.DeviceAddress}平台端物模型属性标识符{item.Key}是特殊指令操作,被禁止。");
}
}
//数据写入遥测任务数据存储通道
await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo));
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, input);
}
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
{
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName, input);
}
else
{
return HttpDataResultExtensions.Failed("指令处理失败,当前设备平台类型异常", -105);
}
return HttpDataResultExtensions.Success("指令下发成功");
}
catch (UserFriendlyException)
{
throw; // 重新抛出用户友好异常
}
catch (Exception ex)
{
_logger.LogError(ex, "接收业务系统指令信息时发生异常");
return HttpDataResultExtensions.Failed("指令处理失败,发送异常", -106);
}
}
/// <summary>
/// 接收业务系统获取属性指令信息
/// </summary>
[AllowAnonymous]
public async Task<HttpDataResult<Dictionary<string, object>>> ReceiveGetCommandInfoAsync(OpenApiRequest input)
{
try
{
var handleResult = HandleOpenApiRequest<ReceiveCommandInfoDto>(input, serverApplicationOptions);
if (handleResult.Success == false)
{
return HttpDataResultExtensions.Failed<Dictionary<string, object>>("获取数据失败,签名校验异常", -101);
}
var messageBody = handleResult.Data;
if (messageBody == null || messageBody.Commands == null || messageBody.Commands.Count <= 0)
{
return HttpDataResultExtensions.Failed<Dictionary<string, object>>("获取数据失败,设备属性值不能为空", -103, ResponeResultEnum.Fail);
}
//限定来源类型必须为业务系统
if (messageBody.SourceType != DeviceTelemetrySourceTypeEnum.BusinessSystem)
{
return HttpDataResultExtensions.Failed<Dictionary<string, object>>("获取数据失败来源类型错误业务系统传固定值2", -104, ResponeResultEnum.Fail);
}
var deviceInfo = await deviceAppService.FindByDeviceAddressAsync(messageBody.DeviceAddress);
if (deviceInfo == null)
{
return HttpDataResultExtensions.Failed<Dictionary<string, object>>("设备不存在", -105, ResponeResultEnum.Fail);
}
var packetTaskInfo = GetDeviceTelemetryPacketTaskInfo(ioTDBOptions, input, deviceInfo.Adapt<DeviceCacheInfos>(), messageBody.Commands.Serialize());
//将指令存储IoTDB数据库和Redis发布通道
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
{
//获取设备对应的平台端物模型信息,校验前端传入的属性标识集合是否存在不合法的属性标识符
var platformThingModelInfo = await platformThingModelInfoAppService.FindByPlatformProductIdAsync(new IdInput<string>() { Id = deviceInfo.IoTPlatformProductId });
if (platformThingModelInfo == null)
{
return HttpDataResultExtensions.Failed<Dictionary<string, object>>($"业务系统推送指令时设备{deviceInfo.DeviceAddress}的平台端物模型信息不存在。", -106, ResponeResultEnum.Fail);
}
foreach (var item in messageBody.Commands)
{
var tempPlatformThingModelInfo = platformThingModelInfo.Where(d => d.IoTPlatformRawFieldName == item.Key).FirstOrDefault();
if (tempPlatformThingModelInfo == null)
{
_logger.LogError($"业务系统推送指令时设备设备{deviceInfo.DeviceAddress}平台端物模型信息不存在属性标识符{item.Key}。");
messageBody.Commands.RemoveAll(d => d.Key == item.Key);
continue;
}
//排除升级指令
if (tempPlatformThingModelInfo.StandardFieldName.ToLowerInvariant() == ThingModelFixedTypeConst.FIRMWARE_UPGRADE.ToLowerInvariant())
{
_logger.LogError($"业务系统推送指令时设备{deviceInfo.DeviceAddress}平台端物模型属性标识符{item.Key}是升级指令操作,被禁止。");
messageBody.Commands.RemoveAll(d => d.Key == item.Key);
continue;
}
if (deviceInfo.IsNeedConfigDeviceModel && deviceInfo.DeviceThingModelDataId.HasValue && item.Key.ToLowerInvariant() == ThingModelFixedTypeConst.SpecialCommand.ToLowerInvariant())
{
_logger.LogError($"业务系统推送指令时设备{deviceInfo.DeviceAddress}平台端物模型属性标识符{item.Key}是特殊指令操作,被禁止。");
messageBody.Commands.RemoveAll(d => d.Key == item.Key);
continue;
}
}
//数据写入遥测任务数据存储通道
await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo));
if (messageBody.Commands == null || messageBody.Commands.Count <= 0)
{
return HttpDataResultExtensions.Failed<Dictionary<string, object>>("获取数据失败OneNET平台未返回数据", -106);
}
var queryResult = await DevicePropertyValueToOneNET(deviceInfo, new DevicePropertyValueForApiInput() { PropertyList = messageBody.Commands.Select(d => d.Key).ToList() });
if (queryResult == null || queryResult.Count <= 0)
{
return HttpDataResultExtensions.Failed<Dictionary<string, object>>("获取数据失败OneNET平台未返回数据", -106);
}
return HttpDataResultExtensions.Success<Dictionary<string, object>>(queryResult);
}
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
{
//await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName, input);
}
else
{
return HttpDataResultExtensions.Failed<Dictionary<string, object>>("指令处理失败,当前设备平台类型异常", -105);
}
return HttpDataResultExtensions.Success<Dictionary<string, object>>("指令下发成功");
}
catch (UserFriendlyException)
{
throw; // 重新抛出用户友好异常
}
catch (Exception ex)
{
_logger.LogError(ex, "接收业务系统指令信息时发生异常");
return HttpDataResultExtensions.Failed<Dictionary<string, object>>("指令处理失败,发送异常", -106);
}
}
/// <summary>
/// 业务系统批量查询设备数据Msg 字段为 BatchQueryDeviceDataInfoInput 实体
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
public async Task<HttpDataResult<List<IoTDBDynamicObject>>> BatchQueryDeviceDataInfoAsync(OpenApiRequest input)
{
try
{
var handleResult = HandleOpenApiRequest<BatchQueryDeviceDataInfoInput>(input, serverApplicationOptions);
if (handleResult.Success == false)
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, handleResult.Message, handleResult.LocationCode);
}
var messageBody = handleResult.Data;
if (messageBody.DeviceAddresses == null || messageBody.DeviceAddresses.Count <= 0)
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "设备地址不能为空", -103);
}
// 验证设备地址格式,防止注入攻击
foreach (var deviceAddress in messageBody.DeviceAddresses)
{
if (!IsValidDeviceAddress(deviceAddress))
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, $"设备地址格式不正确: {deviceAddress}", -107);
}
}
//执行脚本
var result = await FreeRedisProvider.Instance.EvalAsync
(
LUA_SCRIPT,
new[] { RedisConst.CacheAllDeviceInfoHashKey },
messageBody.DeviceAddresses.ToArray()
);
List<DeviceCacheInfos> deviceCacheInfos = new List<DeviceCacheInfos>();
// 处理返回结果
if (result is object[] values)
{
foreach (var value in values)
{
if (value != null)
{
var tempFocusInfo = ServiceProJsonSerializer.Deserialize<DeviceCacheInfos>(value as string);
deviceCacheInfos.Add(tempFocusInfo);
}
else
{
deviceCacheInfos.Add(null); // 添加空值以保持索引对应
}
}
}
List<IoTDBDynamicObject> queryResult = new List<IoTDBDynamicObject>();
for (int i = 0; i < messageBody.DeviceAddresses.Count; i++)
{
var deviceAddress = messageBody.DeviceAddresses[i];
var deviceCacheInfo = deviceCacheInfos.Count > i ? deviceCacheInfos[i] : null;
if (deviceCacheInfo == null)
{
_logger.LogError($"{nameof(BatchQueryDeviceDataInfoAsync)} 业务系统批量查询设备数据,设备地址:{deviceAddress}未找到设备地址缓存信息,消息体为:{input.Serialize()}");
continue;
}
var pageResult = await treeModelService.OpenRequestDeviceDataInfoPageAsync(new DeviceTreeModelDataInfoInput()
{
DeviceAddress = deviceAddress,
DeviceType = messageBody.DeviceType,
IoTDataType = messageBody.IoTDataType,
IsNeedPaging = false,
StartCreationTime = messageBody.BeginTime,
EndCreationTime = messageBody.EndTime
});
//todo 根据业务系统时间间隔要求进行过滤
if (pageResult.Items != null && pageResult.Items.Count > 0)
{
queryResult.AddRange(pageResult.Items);
}
}
return HttpDataResultExtensions.Success(queryResult, "查询成功");
}
catch (Exception ex)
{
_logger.LogError(ex, "批量查询设备数据时发生异常");
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "查询设备数据失败", -106);
}
}
/// <summary>
/// 业务系统查询单个设备数据Msg 字段为 QueryDeviceDataInfoInput 实体
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
public async Task<HttpDataResult<List<IoTDBDynamicObject>>> QueryDeviceDataInfoAsync(OpenApiRequest input)
{
try
{
var handleResult = HandleOpenApiRequest<QueryDeviceDataInfoInput>(input, serverApplicationOptions);
if (handleResult.Success == false)
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, handleResult.Message, handleResult.LocationCode);
}
var messageBody = handleResult.Data;
if (string.IsNullOrWhiteSpace(messageBody.DeviceAddress))
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "设备地址不能为空", -103);
}
// 验证设备地址格式,防止注入攻击
if (!IsValidDeviceAddress(messageBody.DeviceAddress))
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, $"设备地址格式不正确: {messageBody.DeviceAddress}", -107);
}
//执行脚本
var result = await FreeRedisProvider.Instance.EvalAsync
(
LUA_SCRIPT,
new[] { RedisConst.CacheAllDeviceInfoHashKey },
new List<string>() { messageBody.DeviceAddress }.ToArray()
);
List<DeviceCacheInfos> deviceCacheInfos = new List<DeviceCacheInfos>();
// 处理返回结果
if (result is object[] values)
{
foreach (var value in values)
{
if (value != null)
{
var tempFocusInfo = ServiceProJsonSerializer.Deserialize<DeviceCacheInfos>(value as string);
deviceCacheInfos.Add(tempFocusInfo);
}
else
{
deviceCacheInfos.Add(null);
}
}
}
List<IoTDBDynamicObject> queryResult = new List<IoTDBDynamicObject>();
var deviceCacheInfo = deviceCacheInfos.FirstOrDefault();
if (deviceCacheInfo == null)
{
_logger.LogError($"{nameof(QueryDeviceDataInfoAsync)} 业务系统单个查询设备数据,设备地址:{messageBody.DeviceAddress}未找到设备地址缓存信息,消息体为:{input.Serialize()}");
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "设备信息不存在", -108);
}
var pageResult = await treeModelService.OpenRequestDeviceDataInfoPageAsync(new DeviceTreeModelDataInfoInput()
{
DeviceAddress = messageBody.DeviceAddress,
DeviceType = messageBody.DeviceType,
IoTDataType = messageBody.IoTDataType,
IsNeedPaging = false,
StartCreationTime = messageBody.BeginTime,
EndCreationTime = messageBody.EndTime,
SubDeviceAddress = messageBody.SubDeviceAddress,
});
//todo 根据业务系统时间间隔要求进行过滤
if (pageResult.Items != null && pageResult.Items.Count > 0)
{
queryResult.AddRange(pageResult.Items);
}
return HttpDataResultExtensions.Success(queryResult, "查询成功");
}
catch (Exception ex)
{
_logger.LogError(ex, "查询单个设备数据时发生异常");
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "查询设备数据失败", -106);
}
}
/// <summary>
/// 业务系统批量新增设备信息Msg 字段为 BatchCreateDeviceBusinessSystemInput 实体
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
public async Task<HttpDataResult> BatchCreateDeviceInfoAsync(OpenApiRequest input)
{
try
{
var handleResult = HandleOpenApiRequest<BatchCreateDeviceBusinessSystemInput>(input, serverApplicationOptions);
if (handleResult.Success == false)
{
return HttpDataResultExtensions.Failed(handleResult.Message, handleResult.LocationCode);
}
var messageBody = handleResult.Data;
if (messageBody.DeviceInfos == null || messageBody.DeviceInfos.Count <= 0)
{
_logger.LogError($"{nameof(BatchCreateDeviceInfoAsync)} 业务系统批量新增设备数据,设备地址不能为空,消息体为:{input.Serialize()}");
return HttpDataResultExtensions.Failed("设备地址不能为空", -101);
}
if (messageBody.DeviceSourceType != DeviceSourceTypeEnum.Prepay && messageBody.DeviceSourceType != DeviceSourceTypeEnum.Energy)
{
_logger.LogError($"{nameof(BatchCreateDeviceInfoAsync)} 业务系统批量创建设备信息,设备来源异常,消息体为:{input.Serialize()}");
return HttpDataResultExtensions.Failed("业务系统批量创建设备信息,设备来源异常。", -102);
}
var createResult = false;
if (messageBody.IoTPlatform == ServicePro.Enums.IoTPlatformTypeEnum.CTWing)
{
var batchCreateDeviceInput = input.Adapt<BatchCreateDeviceAggregationInput>();
batchCreateDeviceInput.AddressList = messageBody.DeviceInfos.Select(f => f.DeviceAddress.Trim()).ToList();
createResult = await CTWingDeviceBatchCreateAsync(batchCreateDeviceInput);
}
else if (messageBody.IoTPlatform == ServicePro.Enums.IoTPlatformTypeEnum.OneNET)
{
var batchCreateDeviceInput = input.Adapt<BatchCreateDeviceAggregationInput>();
batchCreateDeviceInput.AddressList = messageBody.DeviceInfos.Select(f => f.DeviceAddress.Trim()).ToList();
createResult = await OneNETDeviceBatchCreateAsync(batchCreateDeviceInput);
}
if (createResult == false)
{
_logger.LogError($"{nameof(BatchCreateDeviceInfoAsync)} 业务系统批量新增设备数据没有成功,消息体为:{input.Serialize()}");
return HttpDataResultExtensions.Failed("新增设备失败", -104);
}
return HttpDataResultExtensions.Success("新增设备成功");
}
catch (Exception ex)
{
_logger.LogError(ex, "批量新增设备数据时发生异常");
return HttpDataResultExtensions.Failed("新增设备失败", -106);
}
}
/// <summary>
/// 验证设备地址格式是否合法
/// </summary>
/// <param name="deviceAddress">设备地址</param>
/// <returns>是否合法</returns>
private bool IsValidDeviceAddress(string deviceAddress)
{
if (string.IsNullOrWhiteSpace(deviceAddress))
return false;
// 可根据实际业务规则调整验证逻辑
// 这里简单检查是否包含非法字符
return !deviceAddress.Contains("..") &&
!deviceAddress.Contains("*") &&
!deviceAddress.Contains("~");
}
}
}