388 lines
18 KiB
C#
Raw Normal View History

using JiShe.IoT.BusinessSystemAggregation.Dto;
2025-10-11 17:06:32 +08:00
using JiShe.IoT.DeviceAggregation;
using JiShe.ServicePro;
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
2025-09-19 11:36:17 +08:00
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
2025-08-04 17:29:47 +08:00
using JiShe.ServicePro.Core;
2025-09-19 11:36:17 +08:00
using JiShe.ServicePro.DataChannelManages;
2025-08-04 17:29:47 +08:00
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.DeviceManagement.ThingModels;
2025-08-04 17:29:47 +08:00
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Encrypt;
using JiShe.ServicePro.Enums;
2025-08-21 11:34:16 +08:00
using JiShe.ServicePro.FreeRedisProvider;
2025-09-19 11:36:17 +08:00
using JiShe.ServicePro.IoTDBManagement.DataChannels;
using JiShe.ServicePro.IoTDBManagement.TreeModels;
2025-09-19 11:36:17 +08:00
using Mapster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Generic;
using Volo.Abp;
2025-10-27 17:31:37 +08:00
using Volo.Abp.Auditing;
namespace JiShe.IoT.BusinessSystemAggregation
{
/// <summary>
/// 业务系统聚合服务
/// </summary>
2025-10-27 17:31:37 +08:00
[DisableAuditing]
public class BusinessSystemAggregationService(IOptions<ServerApplicationOptions> options, IReliableRedisPubSubService redisPubSubService, IDeviceAppService deviceAppService, IIoTDBDataChannelManageService ioTDBDataChannelManageService, IOptions<IoTDBOptions> _ioTDBOptions, ITreeModelService treeModelService, IDeviceAggregationService deviceAggregationService, ILogger<BusinessSystemAggregationService> _logger, IIoTPlatformThingModelInfoAppService platformThingModelInfoAppService) : IoTAppService, IBusinessSystemAggregationService
{
ServerApplicationOptions serverOptions = options.Value;
2025-09-19 11:36:17 +08:00
IoTDBOptions ioTDBOptions = _ioTDBOptions.Value;
private const string LUA_SCRIPT = @"
local hashKey = KEYS[1]
local fieldKeys = ARGV
return redis.call('HMGET', hashKey, unpack(fieldKeys))";
/// <summary>
/// 接收业务系统指令信息
/// </summary>
[AllowAnonymous]
public async Task<HttpDataResult> ReceiveCommandInfoAsync(OpenApiRequest input)
{
try
{
var handleResult = HandleOpenApiRequest<ReceiveCommandInfoDto>(input, serverOptions);
if (handleResult.Success == false)
{
return handleResult;
}
var messageBody = handleResult.Data;
if (messageBody == null || messageBody.Commands == null || messageBody.Commands.Count <= 0)
{
return HttpDataResultExtensions.Failed("设备指令不能为空", -103, ResponeResultEnum.Fail);
}
2025-08-04 17:29:47 +08:00
//限定来源类型必须为业务系统
if (messageBody.SourceType != DeviceTelemetrySourceTypeEnum.BusinessSystem)
{
2025-08-21 11:34:16 +08:00
return HttpDataResultExtensions.Failed("设备指令来源类型错误业务系统传固定值2", -104, ResponeResultEnum.Fail);
2025-08-04 17:29:47 +08:00
}
var deviceInfo = await deviceAppService.FindByDeviceAddressAsync(messageBody.DeviceAddress);
2025-09-19 11:36:17 +08:00
if (deviceInfo == null)
{
return HttpDataResultExtensions.Failed("设备不存在", -1041, ResponeResultEnum.Fail);
}
var packetTaskInfo = GetDeviceTelemetryPacketTaskInfo(ioTDBOptions, input, deviceInfo.Adapt<DeviceCacheInfos>(), messageBody.Commands.Serialize());
2025-09-19 11:36:17 +08:00
2025-11-05 15:37:53 +08:00
//将指令存储IoTDB数据库和Redis发布通道
2025-08-04 17:29:47 +08:00
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
{
//获取设备对应的平台端物模型信息,校验前端传入的属性标识集合是否存在不合法的属性标识符
var platformThingModelInfo = await platformThingModelInfoAppService.FindByPlatformProductIdAsync(new IdInput<string>() { Id = deviceInfo.IoTPlatformProductId });
if (platformThingModelInfo == null)
{
throw new UserFriendlyException($"业务系统推送指令时设备{deviceInfo.DeviceAddress}的平台端物模型信息不存在。");
}
foreach (var item in messageBody.Commands)
{
var tempPlatformThingModelInfo = platformThingModelInfo.Where(d => d.IoTPlatformRawFieldName == item.Key).FirstOrDefault();
if (tempPlatformThingModelInfo == null)
{
throw new UserFriendlyException($"业务系统推送指令时设备设备{deviceInfo.DeviceAddress}平台端物模型信息不存在属性标识符{item.Key}。");
}
//排除升级指令
if (tempPlatformThingModelInfo.StandardFieldName.ToLowerInvariant() == ThingModelFixedTypeConst.FIRMWARE_UPGRADE.ToLowerInvariant())
{
throw new UserFriendlyException($"业务系统推送指令时设备{deviceInfo.DeviceAddress}平台端物模型属性标识符{item.Key}是升级指令操作,被禁止。");
}
if (deviceInfo.IsNeedConfigDeviceModel && deviceInfo.DeviceThingModelDataId.HasValue && item.Key.ToLowerInvariant() == ThingModelFixedTypeConst.SpecialCommand.ToLowerInvariant())
{
throw new UserFriendlyException($"业务系统推送指令时设备{deviceInfo.DeviceAddress}平台端物模型属性标识符{item.Key}是特殊指令操作,被禁止。");
}
}
2025-09-19 11:36:17 +08:00
//数据写入遥测任务数据存储通道
await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo));
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, input);
2025-08-04 17:29:47 +08:00
}
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
{
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName, input);
2025-08-04 17:29:47 +08:00
}
else
{
2025-08-21 11:34:16 +08:00
return HttpDataResultExtensions.Failed("指令处理失败,当前设备平台类型异常", -105);
2025-08-04 17:29:47 +08:00
}
2025-08-21 11:34:16 +08:00
return HttpDataResultExtensions.Success("指令下发成功");
}
catch (UserFriendlyException)
{
throw; // 重新抛出用户友好异常
}
2025-08-21 11:34:16 +08:00
catch (Exception ex)
{
_logger.LogError(ex, "接收业务系统指令信息时发生异常");
return HttpDataResultExtensions.Failed("指令处理失败,发送异常", -106);
}
}
/// <summary>
2025-10-27 17:31:37 +08:00
/// 业务系统批量查询设备数据Msg 字段为 BatchQueryDeviceDataInfoInput 实体
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
2025-10-11 17:06:32 +08:00
public async Task<HttpDataResult<List<IoTDBDynamicObject>>> BatchQueryDeviceDataInfoAsync(OpenApiRequest input)
{
try
{
2025-10-27 17:31:37 +08:00
var handleResult = HandleOpenApiRequest<BatchQueryDeviceDataInfoInput>(input, serverOptions);
if (handleResult.Success == false)
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, handleResult.Message, handleResult.LocationCode);
}
var messageBody = handleResult.Data;
if (messageBody.DeviceAddresses == null || messageBody.DeviceAddresses.Count <= 0)
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "设备地址不能为空", -103);
}
// 验证设备地址格式,防止注入攻击
foreach (var deviceAddress in messageBody.DeviceAddresses)
{
if (!IsValidDeviceAddress(deviceAddress))
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, $"设备地址格式不正确: {deviceAddress}", -107);
}
}
//执行脚本
var result = await FreeRedisProvider.Instance.EvalAsync
(
LUA_SCRIPT,
new[] { RedisConst.CacheAllDeviceInfoHashKey },
messageBody.DeviceAddresses.ToArray()
);
List<DeviceCacheInfos> deviceCacheInfos = new List<DeviceCacheInfos>();
// 处理返回结果
if (result is object[] values)
{
foreach (var value in values)
{
if (value != null)
{
var tempFocusInfo = ServiceProJsonSerializer.Deserialize<DeviceCacheInfos>(value as string);
deviceCacheInfos.Add(tempFocusInfo);
}
else
{
deviceCacheInfos.Add(null); // 添加空值以保持索引对应
}
}
}
List<IoTDBDynamicObject> queryResult = new List<IoTDBDynamicObject>();
for (int i = 0; i < messageBody.DeviceAddresses.Count; i++)
{
var deviceAddress = messageBody.DeviceAddresses[i];
var deviceCacheInfo = deviceCacheInfos.Count > i ? deviceCacheInfos[i] : null;
if (deviceCacheInfo == null)
{
2025-10-11 17:06:32 +08:00
_logger.LogError($"{nameof(BatchQueryDeviceDataInfoAsync)} 业务系统批量查询设备数据,设备地址:{deviceAddress}未找到设备地址缓存信息,消息体为:{input.Serialize()}");
continue;
}
2025-10-27 17:31:37 +08:00
var pageResult = await treeModelService.OpenRequestDeviceDataInfoPageAsync(new DeviceTreeModelDataInfoInput()
{
DeviceAddress = deviceAddress,
DeviceType = messageBody.DeviceType,
IoTDataType = messageBody.IoTDataType,
IsNeedPaging = false,
2025-10-27 17:31:37 +08:00
StartCreationTime = messageBody.BeginTime,
EndCreationTime = messageBody.EndTime
});
//todo 根据业务系统时间间隔要求进行过滤
if (pageResult.Items != null && pageResult.Items.Count > 0)
{
queryResult.AddRange(pageResult.Items);
}
}
2025-10-11 17:06:32 +08:00
return HttpDataResultExtensions.Success(queryResult, "查询成功");
}
catch (Exception ex)
{
_logger.LogError(ex, "批量查询设备数据时发生异常");
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "查询设备数据失败", -106);
}
}
2026-01-14 11:56:18 +08:00
/// <summary>
/// 业务系统查询单个设备数据Msg 字段为 QueryDeviceDataInfoInput 实体
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
public async Task<HttpDataResult<List<IoTDBDynamicObject>>> QueryDeviceDataInfoAsync(OpenApiRequest input)
{
try
{
var handleResult = HandleOpenApiRequest<QueryDeviceDataInfoInput>(input, serverOptions);
if (handleResult.Success == false)
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, handleResult.Message, handleResult.LocationCode);
}
var messageBody = handleResult.Data;
if (string.IsNullOrWhiteSpace(messageBody.DeviceAddress))
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "设备地址不能为空", -103);
}
// 验证设备地址格式,防止注入攻击
if (!IsValidDeviceAddress(messageBody.DeviceAddress))
{
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, $"设备地址格式不正确: {messageBody.DeviceAddress}", -107);
}
2026-01-14 11:56:18 +08:00
//执行脚本
var result = await FreeRedisProvider.Instance.EvalAsync
(
LUA_SCRIPT,
2026-01-14 11:56:18 +08:00
new[] { RedisConst.CacheAllDeviceInfoHashKey },
new List<string>() { messageBody.DeviceAddress }.ToArray()
);
List<DeviceCacheInfos> deviceCacheInfos = new List<DeviceCacheInfos>();
// 处理返回结果
if (result is object[] values)
{
foreach (var value in values)
{
if (value != null)
{
var tempFocusInfo = ServiceProJsonSerializer.Deserialize<DeviceCacheInfos>(value as string);
deviceCacheInfos.Add(tempFocusInfo);
}
else
{
deviceCacheInfos.Add(null);
}
2026-01-14 11:56:18 +08:00
}
}
List<IoTDBDynamicObject> queryResult = new List<IoTDBDynamicObject>();
var deviceCacheInfo = deviceCacheInfos.FirstOrDefault();
2026-01-14 11:56:18 +08:00
if (deviceCacheInfo == null)
{
_logger.LogError($"{nameof(QueryDeviceDataInfoAsync)} 业务系统单个查询设备数据,设备地址:{messageBody.DeviceAddress}未找到设备地址缓存信息,消息体为:{input.Serialize()}");
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "设备信息不存在", -108);
2026-01-14 11:56:18 +08:00
}
var pageResult = await treeModelService.OpenRequestDeviceDataInfoPageAsync(new DeviceTreeModelDataInfoInput()
{
DeviceAddress = messageBody.DeviceAddress,
DeviceType = messageBody.DeviceType,
IoTDataType = messageBody.IoTDataType,
IsNeedPaging = false,
StartCreationTime = messageBody.BeginTime,
EndCreationTime = messageBody.EndTime,
SubDeviceAddress = messageBody.SubDeviceAddress,
});
//todo 根据业务系统时间间隔要求进行过滤
if (pageResult.Items != null && pageResult.Items.Count > 0)
{
queryResult.AddRange(pageResult.Items);
}
return HttpDataResultExtensions.Success(queryResult, "查询成功");
}
catch (Exception ex)
{
_logger.LogError(ex, "查询单个设备数据时发生异常");
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "查询设备数据失败", -106);
2026-01-14 11:56:18 +08:00
}
}
2025-10-11 17:06:32 +08:00
/// <summary>
/// 业务系统批量新增设备数据Msg 字段为 BatchCreateDeviceBusinessSystemInput 实体
2025-10-11 17:06:32 +08:00
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
public async Task<HttpDataResult> BatchCreateDeviceInfoAsync(OpenApiRequest input)
{
try
{
var handleResult = HandleOpenApiRequest<BatchCreateDeviceBusinessSystemInput>(input, serverOptions);
2025-10-11 17:06:32 +08:00
if (handleResult.Success == false)
{
return HttpDataResultExtensions.Failed(handleResult.Message, handleResult.LocationCode);
}
var messageBody = handleResult.Data;
if (messageBody.DeviceInfos == null || messageBody.DeviceInfos.Count <= 0)
2025-10-11 17:06:32 +08:00
{
return HttpDataResultExtensions.Failed("设备地址不能为空", -101);
}
// 批量新增设备
var createResult = await deviceAggregationService.BatchCreateDeviceBusinessSystemAsync(messageBody);
2025-10-11 17:06:32 +08:00
if (createResult == false)
{
_logger.LogError($"{nameof(BatchCreateDeviceInfoAsync)} 业务系统批量新增设备数据没有成功,消息体为:{input.Serialize()}");
return HttpDataResultExtensions.Failed("新增设备失败", -102);
}
return HttpDataResultExtensions.Success("新增设备成功");
}
catch (Exception ex)
{
_logger.LogError(ex, "批量新增设备数据时发生异常");
return HttpDataResultExtensions.Failed("新增设备失败", -106);
2025-10-11 17:06:32 +08:00
}
}
/// <summary>
/// 验证设备地址格式是否合法
/// </summary>
/// <param name="deviceAddress">设备地址</param>
/// <returns>是否合法</returns>
private bool IsValidDeviceAddress(string deviceAddress)
{
if (string.IsNullOrWhiteSpace(deviceAddress))
return false;
// 可根据实际业务规则调整验证逻辑
// 这里简单检查是否包含非法字符
return !deviceAddress.Contains("..") &&
!deviceAddress.Contains("*") &&
!deviceAddress.Contains("~");
}
}
}