优化IoTDB时间戳展示,实现设备在线状态以及时间及时更新。
This commit is contained in:
parent
8d325a57d8
commit
5e60436026
@ -1 +1 @@
|
|||||||
Subproject commit d88da5e29442a906e596baa24e262f9c4a6576fb
|
Subproject commit 8471b53a3f0e9f2af02b5122ace19d1d5f04a6ec
|
||||||
@ -1,5 +1,3 @@
|
|||||||
using Serilog.Events;
|
|
||||||
|
|
||||||
namespace JiShe.IoT;
|
namespace JiShe.IoT;
|
||||||
|
|
||||||
public class Program
|
public class Program
|
||||||
|
|||||||
@ -0,0 +1,18 @@
|
|||||||
|
using JiShe.ServicePro.Dto;
|
||||||
|
using JiShe.ServicePro.Kafka.Internal;
|
||||||
|
|
||||||
|
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 数据通讯通道订阅者服务
|
||||||
|
/// </summary>
|
||||||
|
public interface IServiceCommunicationChannelSubscriberService
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 设备上下线通知Kafka消息订阅
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -12,6 +12,7 @@ using Mapster;
|
|||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.IO.Pipelines;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
@ -113,22 +114,7 @@ namespace JiShe.IoT.DeviceAggregation
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateDeviceInput updateDeviceInput = insertResult.Adapt<UpdateDeviceInput>();
|
await DeviceUpdateHandler(insertResult, pushResult);
|
||||||
updateDeviceInput.IoTPlatformResponse = pushResult.Serialize();
|
|
||||||
updateDeviceInput.IsPlatformPushSuccess = true;
|
|
||||||
|
|
||||||
var updateResult = await deviceAppService.UpdateAsync(updateDeviceInput);
|
|
||||||
if (updateResult == null)
|
|
||||||
{
|
|
||||||
logger.LogError($"{nameof(CreateAsync)} 更新设备信息失败:{input.Serialize()}");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
//设备数据缓存到Redis
|
|
||||||
DeviceCacheInfos deviceCacheInfos = insertResult.Adapt<DeviceCacheInfos>();
|
|
||||||
deviceCacheInfos.IoTPlatformResponse = updateDeviceInput.IoTPlatformResponse;
|
|
||||||
|
|
||||||
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, insertResult.DeviceAddress, deviceCacheInfos);
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -183,14 +169,62 @@ namespace JiShe.IoT.DeviceAggregation
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var entityDevice = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>().Where(f => f.Id == input.Id).FirstAsync();
|
var entityDevice = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>().Where(f => f.Id == input.Id).FirstAsync<DeviceManagementInfoDto>();
|
||||||
if (entityDevice == null)
|
if (entityDevice == null)
|
||||||
{
|
{
|
||||||
throw new UserFriendlyException($"推送失败,未找到设备数据");
|
throw new UserFriendlyException($"推送失败,未找到设备数据");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (entityDevice.IoTPlatform == ServicePro.Enums.IoTPlatformTypeEnum.CTWing)
|
||||||
|
{
|
||||||
|
return await RepushDeviceInfoToCTWing(entityDevice);
|
||||||
|
}
|
||||||
|
else if (entityDevice.IoTPlatform == ServicePro.Enums.IoTPlatformTypeEnum.OneNET)
|
||||||
|
{
|
||||||
|
return await RepushDeviceInfoToOneNET(entityDevice);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new UserFriendlyException($"推送失败,异常的物联网平台");
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 重新推送设备信息到CTWing物联网平台
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="input"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
/// <exception cref="UserFriendlyException"></exception>
|
||||||
|
public async Task<DeviceManagementInfoDto> RepushDeviceInfoToCTWing(DeviceManagementInfoDto input)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
throw new UserFriendlyException($"推送失败,CTWing暂未实现。");
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 重新推送设备信息到OneNET物联网平台
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="input"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
/// <exception cref="UserFriendlyException"></exception>
|
||||||
|
public async Task<DeviceManagementInfoDto> RepushDeviceInfoToOneNET(DeviceManagementInfoDto input)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
|
||||||
var productInfo = await FreeSqlDbContext.Instance.Select<OneNETProductInfos>()
|
var productInfo = await FreeSqlDbContext.Instance.Select<OneNETProductInfos>()
|
||||||
.Where(e => e.IsEnabled == true && e.IoTPlatformProductId == entityDevice.IoTPlatformProductId)
|
.Where(e => e.IsEnabled == true && e.IoTPlatformProductId == input.IoTPlatformProductId)
|
||||||
.FirstAsync();
|
.FirstAsync();
|
||||||
|
|
||||||
if (productInfo == null)
|
if (productInfo == null)
|
||||||
@ -202,10 +236,10 @@ namespace JiShe.IoT.DeviceAggregation
|
|||||||
//推送至OneNET平台
|
//推送至OneNET平台
|
||||||
var pushResult = await oneNETDeviceService.CreateDeviceInfoAsync(new CreateDeviceInfoInput()
|
var pushResult = await oneNETDeviceService.CreateDeviceInfoAsync(new CreateDeviceInfoInput()
|
||||||
{
|
{
|
||||||
DeviceName = entityDevice.IoTPlatformDeviceOpenInfo,
|
DeviceName = input.IoTPlatformDeviceOpenInfo,
|
||||||
ProductId = productInfo.IoTPlatformProductId,
|
ProductId = productInfo.IoTPlatformProductId,
|
||||||
OneNETAccountId = productInfo.OneNETAccountId,
|
OneNETAccountId = productInfo.OneNETAccountId,
|
||||||
Description = entityDevice.DeviceAddress,
|
Description = input.DeviceAddress,
|
||||||
});
|
});
|
||||||
|
|
||||||
if (pushResult == null || pushResult.Code != ServicePro.Enums.ResponeResultEnum.Success)
|
if (pushResult == null || pushResult.Code != ServicePro.Enums.ResponeResultEnum.Success)
|
||||||
@ -215,8 +249,27 @@ namespace JiShe.IoT.DeviceAggregation
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateDeviceInput updateDeviceInput = entityDevice.Adapt<UpdateDeviceInput>();
|
return await DeviceUpdateHandler(input, pushResult);
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 更新设备信息并处理缓存
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="input"></param>
|
||||||
|
/// <param name="pushResult"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
/// <exception cref="UserFriendlyException"></exception>
|
||||||
|
private async Task<DeviceManagementInfoDto> DeviceUpdateHandler(DeviceManagementInfoDto input, HttpDataResult pushResult)
|
||||||
|
{
|
||||||
|
UpdateDeviceInput updateDeviceInput = input.Adapt<UpdateDeviceInput>();
|
||||||
updateDeviceInput.IoTPlatformResponse = pushResult.Serialize();
|
updateDeviceInput.IoTPlatformResponse = pushResult.Serialize();
|
||||||
|
updateDeviceInput.IsPlatformPushSuccess = true;
|
||||||
|
|
||||||
var updateResult = await deviceAppService.UpdateAsync(updateDeviceInput);
|
var updateResult = await deviceAppService.UpdateAsync(updateDeviceInput);
|
||||||
if (updateResult == null)
|
if (updateResult == null)
|
||||||
@ -226,17 +279,13 @@ namespace JiShe.IoT.DeviceAggregation
|
|||||||
}
|
}
|
||||||
|
|
||||||
//设备数据缓存到Redis
|
//设备数据缓存到Redis
|
||||||
DeviceCacheInfos deviceCacheInfos = entityDevice.Adapt<DeviceCacheInfos>();
|
DeviceCacheInfos deviceCacheInfos = input.Adapt<DeviceCacheInfos>();
|
||||||
deviceCacheInfos.IoTPlatformResponse = updateDeviceInput.IoTPlatformResponse;
|
deviceCacheInfos.IoTPlatformResponse = null;
|
||||||
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, entityDevice.DeviceAddress, deviceCacheInfos);
|
deviceCacheInfos.PlatformPassword = null;
|
||||||
|
|
||||||
return entityDevice.Adapt<DeviceManagementInfoDto>();
|
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, input.DeviceAddress, deviceCacheInfos);
|
||||||
}
|
|
||||||
catch (Exception)
|
|
||||||
{
|
|
||||||
|
|
||||||
throw;
|
return input.Adapt<DeviceManagementInfoDto>();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,95 @@
|
|||||||
|
using JiShe.IoT;
|
||||||
|
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
|
||||||
|
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
|
||||||
|
using JiShe.ServicePro.Consts;
|
||||||
|
using JiShe.ServicePro.Core;
|
||||||
|
using JiShe.ServicePro.DataChannelManages;
|
||||||
|
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
|
||||||
|
using JiShe.ServicePro.Dto;
|
||||||
|
using JiShe.ServicePro.Enums;
|
||||||
|
using JiShe.ServicePro.FreeRedisProvider;
|
||||||
|
using JiShe.ServicePro.IoTDBManagement.DataChannels;
|
||||||
|
using JiShe.ServicePro.IoTDBManagement.TableModels;
|
||||||
|
using JiShe.ServicePro.Kafka.Attributes;
|
||||||
|
using JiShe.ServicePro.Kafka.Consts;
|
||||||
|
using JiShe.ServicePro.Kafka.Internal;
|
||||||
|
using JiShe.ServicePro.OneNET.Provider.ReceiveModels;
|
||||||
|
using JiShe.ServicePro.OneNET.Provider.ReceiveModels.ThingModeData;
|
||||||
|
using JiShe.ServicePro.ServerOptions;
|
||||||
|
using Mapster;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using Volo.Abp;
|
||||||
|
using Volo.Abp.Auditing;
|
||||||
|
|
||||||
|
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 设备状态统一订阅订阅服务
|
||||||
|
/// </summary>
|
||||||
|
[DisableAuditing]
|
||||||
|
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe
|
||||||
|
{
|
||||||
|
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
|
||||||
|
private readonly IoTDBOptions _iotDBOptions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 设备状态统一订阅订阅服务
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="logger"></param>
|
||||||
|
/// <param name="iotDBOptions"></param>
|
||||||
|
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_iotDBOptions = iotDBOptions.Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 设备上下线通知Kafka消息订阅
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
[KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationDeviceStatusEventName)]
|
||||||
|
public async Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var deviceEntity = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
|
||||||
|
.Where(d => d.IoTPlatform == message.IoTPlatform && d.DeviceAddress == message.DeviceAddress)
|
||||||
|
.FirstAsync();
|
||||||
|
if (deviceEntity == null)
|
||||||
|
{
|
||||||
|
return SubscribeAck.Fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
deviceEntity.DeviceOnlineStatus = message.Status;
|
||||||
|
if (message.Status == DeviceOnlineStatusEnum.Online)
|
||||||
|
{
|
||||||
|
deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds,DateTimeKind.Local);
|
||||||
|
}
|
||||||
|
else if (message.Status == DeviceOnlineStatusEnum.Offline)
|
||||||
|
{
|
||||||
|
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||||
|
}
|
||||||
|
|
||||||
|
var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
|
||||||
|
.SetSource(deviceEntity)
|
||||||
|
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
|
||||||
|
.ExecuteAffrowsAsync();
|
||||||
|
|
||||||
|
if (updateResult <=0)
|
||||||
|
{
|
||||||
|
_logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{message.Serialize()}");
|
||||||
|
return SubscribeAck.Fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
return SubscribeAck.Success();
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user