226 lines
9.8 KiB
C#
226 lines
9.8 KiB
C#
using JiShe.IoT.BusinessSystemAggregation.Dto;
|
||
using JiShe.IoT.DeviceAggregation;
|
||
using JiShe.ServicePro;
|
||
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
|
||
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
|
||
using JiShe.ServicePro.Core;
|
||
using JiShe.ServicePro.DataChannelManages;
|
||
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
|
||
using JiShe.ServicePro.Dto;
|
||
using JiShe.ServicePro.Encrypt;
|
||
using JiShe.ServicePro.Enums;
|
||
using JiShe.ServicePro.FreeRedisProvider;
|
||
using JiShe.ServicePro.IoTDBManagement.DataChannels;
|
||
using JiShe.ServicePro.IoTDBManagement.TreeModels;
|
||
using Mapster;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Options;
|
||
using System.Collections.Generic;
|
||
using Volo.Abp.Auditing;
|
||
|
||
namespace JiShe.IoT.BusinessSystemAggregation
|
||
{
|
||
/// <summary>
|
||
/// 业务系统聚合服务
|
||
/// </summary>
|
||
[DisableAuditing]
|
||
public class BusinessSystemAggregationService(IOptions<ServerApplicationOptions> options, IReliableRedisPubSubService redisPubSubService, IDeviceAppService deviceAppService, IIoTDBDataChannelManageService ioTDBDataChannelManageService, IOptions<IoTDBOptions> _ioTDBOptions, ITreeModelService treeModelService, IDeviceAggregationService deviceAggregationService, ILogger<BusinessSystemAggregationService> _logger) : IoTAppService, IBusinessSystemAggregationService
|
||
{
|
||
ServerApplicationOptions serverOptions = options.Value;
|
||
IoTDBOptions ioTDBOptions = _ioTDBOptions.Value;
|
||
|
||
/// <summary>
|
||
/// 接收业务系统指令信息
|
||
/// </summary>
|
||
[AllowAnonymous]
|
||
public async Task<HttpDataResult> ReceiveCommandInfoAsync(OpenApiRequest input)
|
||
{
|
||
|
||
try
|
||
{
|
||
|
||
var handleResult = HandleOpenApiRequest<ReceiveCommandInfoDto>(input, serverOptions);
|
||
if (handleResult.Success == false)
|
||
{
|
||
return handleResult;
|
||
}
|
||
var messageBody = handleResult.Data;
|
||
string tempMessageBody = null;
|
||
|
||
//限定来源类型必须为业务系统
|
||
if (messageBody.SourceType != DeviceTelemetrySourceTypeEnum.BusinessSystem)
|
||
{
|
||
return HttpDataResultExtensions.Failed("设备指令来源类型错误,业务系统传固定值2", -104, ResponeResultEnum.Fail);
|
||
}
|
||
|
||
var deviceInfo = await deviceAppService.FindByDeviceAddressAsync(messageBody.DeviceAddress);
|
||
|
||
if (deviceInfo == null)
|
||
{
|
||
return HttpDataResultExtensions.Failed("设备不存在", -1041, ResponeResultEnum.Fail);
|
||
}
|
||
|
||
var packetTaskInfo = GetDeviceTelemetryPacketTaskInfo(ioTDBOptions, input, deviceInfo.Adapt<DeviceCacheInfos>(), tempMessageBody);
|
||
|
||
//将指令存储Kafka的OneNET主题中
|
||
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
|
||
{
|
||
//数据写入遥测任务数据存储通道
|
||
await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo));
|
||
|
||
|
||
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, input);
|
||
}
|
||
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
|
||
{
|
||
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName, input);
|
||
}
|
||
else
|
||
{
|
||
return HttpDataResultExtensions.Failed("指令处理失败,当前设备平台类型异常", -105);
|
||
}
|
||
|
||
return HttpDataResultExtensions.Success("指令下发成功");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
return HttpDataResultExtensions.Failed($"指令处理失败,发送异常:{ex.Message}", -106);
|
||
}
|
||
}
|
||
|
||
|
||
/// <summary>
|
||
/// 业务系统批量查询设备数据,Msg 字段为 BatchQueryDeviceDataInfoInput 实体
|
||
/// </summary>
|
||
/// <param name="input"></param>
|
||
/// <returns></returns>
|
||
[AllowAnonymous]
|
||
public async Task<HttpDataResult<List<IoTDBDynamicObject>>> BatchQueryDeviceDataInfoAsync(OpenApiRequest input)
|
||
{
|
||
try
|
||
{
|
||
var handleResult = HandleOpenApiRequest<BatchQueryDeviceDataInfoInput>(input, serverOptions);
|
||
if (handleResult.Success == false)
|
||
{
|
||
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, handleResult.Message, handleResult.LocationCode);
|
||
}
|
||
var messageBody = handleResult.Data;
|
||
|
||
if (messageBody.DeviceAddresses == null || messageBody.DeviceAddresses.Count <= 0)
|
||
{
|
||
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, "设备地址不能为空", -103);
|
||
}
|
||
|
||
// Lua脚本
|
||
string luaScript = @"
|
||
local hashKey = KEYS[1]
|
||
local fieldKeys = ARGV
|
||
return redis.call('HMGET', hashKey, unpack(fieldKeys))";
|
||
|
||
//执行脚本
|
||
var result = await FreeRedisProvider.Instance.EvalAsync
|
||
(
|
||
luaScript,
|
||
new[] { RedisConst.CacheAllDeviceInfoHashKey },
|
||
messageBody.DeviceAddresses.ToArray()
|
||
);
|
||
|
||
List<DeviceCacheInfos> deviceCacheInfos = new List<DeviceCacheInfos>();
|
||
|
||
// 处理返回结果
|
||
if (result is object[] values)
|
||
{
|
||
foreach (var value in values)
|
||
{
|
||
var tempFocusInfo = ServiceProJsonSerializer.Deserialize<DeviceCacheInfos>(value as string);
|
||
deviceCacheInfos.Add(tempFocusInfo);
|
||
}
|
||
}
|
||
|
||
List<IoTDBDynamicObject> queryResult = new List<IoTDBDynamicObject>();
|
||
foreach (var deviceAddress in messageBody.DeviceAddresses)
|
||
{
|
||
var deviceCacheInfo = deviceCacheInfos.FirstOrDefault(x => x.DeviceAddress == deviceAddress);
|
||
|
||
if (deviceCacheInfo == null)
|
||
{
|
||
_logger.LogError($"{nameof(BatchQueryDeviceDataInfoAsync)} 业务系统批量查询设备数据,设备地址:{deviceAddress}未找到设备地址缓存信息,消息体为:{input.Serialize()}");
|
||
continue;
|
||
}
|
||
|
||
var pageResult = await treeModelService.OpenRequestDeviceDataInfoPageAsync(new DeviceTreeModelDataInfoInput()
|
||
{
|
||
DeviceAddress = deviceAddress,
|
||
DeviceType = messageBody.DeviceType,
|
||
IoTDataType = messageBody.IoTDataType,
|
||
IsNeedPaging = false,
|
||
StartCreationTime = messageBody.BeginTime,
|
||
EndCreationTime = messageBody.EndTime
|
||
});
|
||
|
||
//todo 根据业务系统时间间隔要求进行过滤
|
||
if (pageResult.Items != null && pageResult.Items.Count > 0)
|
||
{
|
||
queryResult.AddRange(pageResult.Items);
|
||
}
|
||
}
|
||
|
||
return HttpDataResultExtensions.Success(queryResult, "查询成功");
|
||
|
||
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
|
||
return HttpDataResultExtensions.Failed<List<IoTDBDynamicObject>>(null, $"查询设备数据失败,发送异常:{ex.Message}", -106);
|
||
|
||
}
|
||
}
|
||
|
||
|
||
/// <summary>
|
||
/// 业务系统批量新增设备数据,Msg 字段为 BatchCreateDeviceInfoInput 实体
|
||
/// </summary>
|
||
/// <param name="input"></param>
|
||
/// <returns></returns>
|
||
[AllowAnonymous]
|
||
public async Task<HttpDataResult> BatchCreateDeviceInfoAsync(OpenApiRequest input)
|
||
{
|
||
try
|
||
{
|
||
var handleResult = HandleOpenApiRequest<BatchCreateDeviceInfoInput>(input, serverOptions);
|
||
if (handleResult.Success == false)
|
||
{
|
||
return HttpDataResultExtensions.Failed(handleResult.Message, handleResult.LocationCode);
|
||
}
|
||
var messageBody = handleResult.Data;
|
||
|
||
if (messageBody.DeviceAddresses == null || messageBody.DeviceAddresses.Count <= 0)
|
||
{
|
||
return HttpDataResultExtensions.Failed("设备地址不能为空", -101);
|
||
}
|
||
|
||
var createResult = await deviceAggregationService.BatchCreateDeviceBusinessSystemAsync(new BatchCreateDeviceAggregationInput()
|
||
{
|
||
DeviceSourceTypeEnum = messageBody.DeviceSourceType,
|
||
IoTPlatform = messageBody.IoTPlatform,
|
||
IoTPlatformProductId = messageBody.IoTPlatformProductId,
|
||
AddressList = messageBody.DeviceAddresses
|
||
});
|
||
|
||
if (createResult == false)
|
||
{
|
||
_logger.LogError($"{nameof(BatchCreateDeviceInfoAsync)} 业务系统批量新增设备数据没有成功,消息体为:{input.Serialize()}");
|
||
return HttpDataResultExtensions.Failed("新增设备失败", -102);
|
||
}
|
||
|
||
return HttpDataResultExtensions.Success("新增设备成功");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
return HttpDataResultExtensions.Failed($"查询设备数据失败,发送异常:{ex.Message}", -106);
|
||
}
|
||
}
|
||
}
|
||
}
|