设备上下线事件逻辑优化
This commit is contained in:
parent
4172cfce13
commit
7ca6f529e8
@ -1,5 +1,5 @@
|
||||
CREATE TABLE IF NOT EXISTS CTWingAepReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, SubDevice STRING TAG, PlatformTenantId STRING FIELD, ProductId STRING FIELD, ServiceId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD, IMSI STRING FIELD, IMEI STRING FIELD ) COMMENT 'CTWingAepReceiveMessageEntity';
|
||||
CREATE TABLE IF NOT EXISTS CTWingAepReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, PlatformTenantId STRING FIELD, ProductId STRING FIELD, ServiceId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD, IMSI STRING FIELD, IMEI STRING FIELD ) COMMENT 'CTWingAepReceiveMessageEntity';
|
||||
|
||||
CREATE TABLE IF NOT EXISTS DeviceTelemetryPacketTaskInfo( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, SubDevice STRING TAG, IoTPlatformProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, IoTPlatformProductName STRING FIELD, IoTPlatformAccountId STRING FIELD, AccountPhoneNumber STRING FIELD, IssueRawMessage STRING FIELD, IssuePayload STRING FIELD, ResponseRawMessage STRING FIELD, ResponsePayload STRING FIELD, TelemetrySource INT32 FIELD, IoTPlatform INT32 FIELD, RetryCount INT32 FIELD, LastIssueTime TIMESTAMP FIELD, IssueStatus INT32 FIELD ) COMMENT 'DeviceTelemetryPacketTaskInfo';
|
||||
CREATE TABLE IF NOT EXISTS DeviceTelemetryPacketTaskInfo( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, IoTPlatformProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, IoTPlatformProductName STRING FIELD, IoTPlatformAccountId STRING FIELD, AccountPhoneNumber STRING FIELD, IssueRawMessage STRING FIELD, IssuePayload STRING FIELD, ResponseRawMessage STRING FIELD, ResponsePayload STRING FIELD, TelemetrySource INT32 FIELD, IoTPlatform INT32 FIELD, RetryCount INT32 FIELD, LastIssueTime TIMESTAMP FIELD, IssueStatus INT32 FIELD ) COMMENT 'DeviceTelemetryPacketTaskInfo';
|
||||
|
||||
CREATE TABLE IF NOT EXISTS OneNETReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, SubDevice STRING TAG, ProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, IsEncrypted BOOLEAN FIELD, PlaintextMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD ) COMMENT 'OneNETReceiveMessageEntity';
|
||||
CREATE TABLE IF NOT EXISTS OneNETReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, ProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, IsEncrypted BOOLEAN FIELD, PlaintextMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD ) COMMENT 'OneNETReceiveMessageEntity';
|
||||
@ -1,4 +1,6 @@
|
||||
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
using JiShe.ServicePro.Dto;
|
||||
|
||||
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
{
|
||||
/// <summary>
|
||||
/// 数据通讯通道订阅者服务
|
||||
@ -10,5 +12,12 @@
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
Task ServiceCommunicationDeviceStatusSubscriber();
|
||||
|
||||
/// <summary>
|
||||
/// 设备状态更新
|
||||
/// </summary>
|
||||
/// <param name="input"></param>
|
||||
/// <returns></returns>
|
||||
Task DeviceStatusUpdate(DeviceStatusMessage input);
|
||||
}
|
||||
}
|
||||
|
||||
@ -492,19 +492,19 @@ namespace JiShe.IoT.DeviceAggregation
|
||||
{
|
||||
try
|
||||
{
|
||||
if(input.AddressList == null || input.AddressList.Count <= 0)
|
||||
if (input.AddressList == null || input.AddressList.Count <= 0)
|
||||
{
|
||||
throw new UserFriendlyException($"{nameof(DeviceBatchUpgradeForApiAsync)} 设备批量升级时地址列表不能为空","-101");
|
||||
throw new UserFriendlyException($"{nameof(DeviceBatchUpgradeForApiAsync)} 设备批量升级时地址列表不能为空", "-101");
|
||||
}
|
||||
|
||||
if (input.AddressList.Count >200)
|
||||
if (input.AddressList.Count > 200)
|
||||
{
|
||||
throw new UserFriendlyException($"{nameof(DeviceBatchUpgradeForApiAsync)} 设备批量升级时地址列表数量不能超过200", "-102");
|
||||
}
|
||||
|
||||
var (deviceFirmwareVersionInfo, fileInfo) = await ValidateAndGetFirmwareInfoAsync(input.NowFirmwareVersionDataId, input.IoTPlatformProductId, "-103", "-104");
|
||||
|
||||
var deviceInfoList = await deviceAppService.FindByDeviceAddressAsync(new FindByDeviceAddressInput() { AddressList = input.AddressList ,IoTPlatform = input.IoTPlatform,IoTPlatformProductId = input.IoTPlatformProductId});
|
||||
var deviceInfoList = await deviceAppService.FindByDeviceAddressAsync(new FindByDeviceAddressInput() { AddressList = input.AddressList, IoTPlatform = input.IoTPlatform, IoTPlatformProductId = input.IoTPlatformProductId });
|
||||
|
||||
//如果是OneNET平台,提前获取产品信息和物模型信息,避免在循环中重复调用
|
||||
dynamic oneNETProductInfo = null;
|
||||
@ -524,7 +524,7 @@ namespace JiShe.IoT.DeviceAggregation
|
||||
|
||||
foreach (var item in input.AddressList)
|
||||
{
|
||||
var deviceInfo = deviceInfoList.Where(d=>d.DeviceAddress== item).FirstOrDefault();
|
||||
var deviceInfo = deviceInfoList.Where(d => d.DeviceAddress == item).FirstOrDefault();
|
||||
if (deviceInfo == null)
|
||||
{
|
||||
deviceAddress.Add(item);
|
||||
@ -568,15 +568,21 @@ namespace JiShe.IoT.DeviceAggregation
|
||||
{
|
||||
try
|
||||
{
|
||||
// Lua 脚本:获取并删除键(原子操作)
|
||||
// Lua 脚本:获取并删除键(原子操作)升级记录Id与文件ID关系缓存
|
||||
string cacheKey = string.Format($"{RedisConst.CacheDeviceUpgradeRecordDataKey}", input.Id);
|
||||
var redisResult = await FreeRedisProvider.Instance.GetDelAsync(cacheKey);
|
||||
var redisResult = await FreeRedisProvider.Instance.GetAsync(cacheKey);
|
||||
if (string.IsNullOrWhiteSpace(redisResult))
|
||||
{
|
||||
throw new UserFriendlyException($"{nameof(DownloadFirmwareInfoAsync)} 设备升级记录信息 {input.Id} 不存在");
|
||||
}
|
||||
|
||||
var downLoadResult = await fileAppService.AllowDownloadAsync(input);
|
||||
//将缓存中的文件ID转换成GUID
|
||||
Guid fileId = new Guid(redisResult);
|
||||
|
||||
var downLoadResult = await fileAppService.AllowDownloadAsync(new IdInput()
|
||||
{
|
||||
Id = fileId,
|
||||
});
|
||||
if (downLoadResult.ContentLength.HasValue && downLoadResult.ContentLength.Value > 0)
|
||||
{
|
||||
//更新状态为文件下载中
|
||||
@ -1100,10 +1106,10 @@ namespace JiShe.IoT.DeviceAggregation
|
||||
|
||||
await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETUpgradeCommandIssuedEventName, packetTaskInfo));
|
||||
|
||||
//将升级记录数据Id存入Redis缓存中。
|
||||
//将升级记录数据Id作为key,文件数据ID作为值,存入Redis缓存中。
|
||||
string cacheKey = string.Format($"{RedisConst.CacheDeviceUpgradeRecordDataKey}", upgradeRecordInput.Id);
|
||||
|
||||
await FreeRedisProvider.Instance.SetAsync(cacheKey, upgradeRecordInput.Id);
|
||||
await FreeRedisProvider.Instance.SetAsync(cacheKey, fileObject.Id);
|
||||
|
||||
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo);
|
||||
|
||||
|
||||
@ -10,13 +10,13 @@ using System.Threading.Tasks;
|
||||
namespace JiShe.IoT.Jobs
|
||||
{
|
||||
/// <summary>
|
||||
/// 缓存设备数据到redis
|
||||
/// 设备服务定时任务
|
||||
/// </summary>
|
||||
public class CacheDeviceDataToRedisJob : SystemBackGroundWorkService
|
||||
public class DeviceServerJob : SystemBackGroundWorkService
|
||||
{
|
||||
private readonly ILogger<CacheDeviceDataToRedisJob> _logger;
|
||||
private readonly ILogger<DeviceServerJob> _logger;
|
||||
public readonly DeviceAppService _meterAppService;
|
||||
public CacheDeviceDataToRedisJob(ILogger<CacheDeviceDataToRedisJob> logger, DeviceAppService meterAppService) : base(logger)
|
||||
public DeviceServerJob(ILogger<DeviceServerJob> logger, DeviceAppService meterAppService) : base(logger)
|
||||
{
|
||||
_logger = logger;
|
||||
_meterAppService = meterAppService;
|
||||
@ -27,9 +27,11 @@ namespace JiShe.IoT.Jobs
|
||||
/// </summary>
|
||||
/// <param name="cancellationToken"></param>
|
||||
/// <returns></returns>
|
||||
protected override Task DoWorkAsync(CancellationToken cancellationToken)
|
||||
protected override async Task DoWorkAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return _meterAppService.CacheDeviceDataToRedisAsync();
|
||||
//缓存数据到redis
|
||||
await _meterAppService.CacheDeviceDataToRedisAsync();
|
||||
await _meterAppService.CacheDeviceDataToRedisAsync();
|
||||
}
|
||||
|
||||
protected override TimeSpan GetInterval()
|
||||
@ -5,6 +5,8 @@ using JiShe.ServicePro.Dto;
|
||||
using JiShe.ServicePro.FreeRedisProvider;
|
||||
using JiShe.ServicePro.FreeSqlProvider;
|
||||
using Mapster;
|
||||
using Microsoft.Extensions.Caching.Memory;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Volo.Abp;
|
||||
using Volo.Abp.Auditing;
|
||||
@ -18,17 +20,21 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
|
||||
{
|
||||
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
|
||||
private readonly IReliableRedisPubSubService _redisPubSubService;
|
||||
private readonly IReliableRedisPubSubService _reliableRedisPubSubService;
|
||||
private readonly IServiceScopeFactory _serviceScopeFactory;
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 设备状态统一订阅订阅服务
|
||||
/// </summary>
|
||||
/// <param name="logger"></param>
|
||||
/// <param name="redisPubSubService"></param>
|
||||
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IReliableRedisPubSubService redisPubSubService)
|
||||
/// <param name="serviceScopeFactory"></param>
|
||||
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IReliableRedisPubSubService redisPubSubService, IServiceScopeFactory serviceScopeFactory)
|
||||
{
|
||||
_logger = logger;
|
||||
_redisPubSubService = redisPubSubService;
|
||||
_reliableRedisPubSubService = redisPubSubService;
|
||||
_serviceScopeFactory = serviceScopeFactory;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -39,15 +45,18 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
{
|
||||
try
|
||||
{
|
||||
// 为订阅回调创建独立的 FreeSql 客户端
|
||||
var callbackFreeSqlDbContext = FreeSqlDbContext;
|
||||
var callbackFreeSql = FreeRedisProvider;
|
||||
|
||||
|
||||
// 订阅频道
|
||||
await _redisPubSubService.SubscribeReliableAsync<DeviceStatusMessage>(DistributedMessageCenterConst.ServiceCommunicationDeviceStatusEventName, async (message) =>
|
||||
await _reliableRedisPubSubService.SubscribeReliableAsync<DeviceStatusMessage>(DistributedMessageCenterConst.ServiceCommunicationDeviceStatusEventName, async (message) =>
|
||||
{
|
||||
await HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql);
|
||||
using (var scope = _serviceScopeFactory.CreateScope())
|
||||
{
|
||||
var sp = scope.ServiceProvider;
|
||||
var scopedServiceCommunicationChannelSubscriberService = sp.GetRequiredService<IServiceCommunicationChannelSubscriberService>();
|
||||
var scopedLogger = sp.GetRequiredService<ILogger<ServiceCommunicationChannelSubscriberService>>();
|
||||
scopedLogger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)}收到设备{message.DeviceAddress}上下线状态更新通知,开始更新{message.Status}状态处理...");
|
||||
await scopedServiceCommunicationChannelSubscriberService.DeviceStatusUpdate(message);
|
||||
scopedLogger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)}设备{message.DeviceAddress}上下线状态更新通知,结束更新{message.Status}状态处理...");
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@ -61,46 +70,44 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
/// <summary>
|
||||
/// 设备状态处理
|
||||
/// </summary>
|
||||
/// <param name="deviceStatusMessage"></param>
|
||||
/// <param name="callbackFreeSqlDbContext"></param>
|
||||
/// <param name="callbackFreeRedisProvider"></param>
|
||||
/// <param name="input"></param>
|
||||
/// <returns></returns>
|
||||
/// <exception cref="UserFriendlyException"></exception>
|
||||
private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext, IFreeRedisProvider callbackFreeRedisProvider)
|
||||
public async Task DeviceStatusUpdate(DeviceStatusMessage input)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (deviceStatusMessage == null || callbackFreeSqlDbContext == null)
|
||||
if (input == null || input.DeviceAddress.IsNullOrWhiteSpace())
|
||||
{
|
||||
throw new UserFriendlyException($"设备状态消息处理失败,参数为空");
|
||||
}
|
||||
|
||||
var deviceEntity = await callbackFreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
|
||||
.Where(d => d.IoTPlatform == deviceStatusMessage.IoTPlatform && d.DeviceAddress == deviceStatusMessage.DeviceAddress)
|
||||
var deviceEntity = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
|
||||
.Where(d => d.IoTPlatform == input.IoTPlatform && d.DeviceAddress == input.DeviceAddress)
|
||||
.FirstAsync();
|
||||
if (deviceEntity == null)
|
||||
{
|
||||
throw new UserFriendlyException($"{deviceStatusMessage.DeviceAddress} 设备不存在");
|
||||
throw new UserFriendlyException($"{input.DeviceAddress} 设备不存在");
|
||||
}
|
||||
|
||||
deviceEntity.DeviceOnlineStatus = deviceStatusMessage.Status;
|
||||
if (deviceStatusMessage.Status == DeviceOnlineStatus.Online)
|
||||
deviceEntity.DeviceOnlineStatus = input.Status;
|
||||
if (input.Status == DeviceOnlineStatus.Online)
|
||||
{
|
||||
deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||
deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(input.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||
}
|
||||
else if (deviceStatusMessage.Status == DeviceOnlineStatus.Offline)
|
||||
else if (input.Status == DeviceOnlineStatus.Offline)
|
||||
{
|
||||
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(input.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||
}
|
||||
|
||||
var updateResult = await callbackFreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
|
||||
var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
|
||||
.SetSource(deviceEntity)
|
||||
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
|
||||
.ExecuteAffrowsAsync();
|
||||
|
||||
if (updateResult <= 0)
|
||||
{
|
||||
_logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{deviceStatusMessage.Serialize()}");
|
||||
_logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{input.Serialize()}");
|
||||
|
||||
throw new UserFriendlyException($"设备状态更新失败");
|
||||
}
|
||||
@ -109,12 +116,12 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
DeviceCacheInfos deviceCacheInfos = deviceEntity.Adapt<DeviceCacheInfos>();
|
||||
deviceCacheInfos.PlatformPassword = null;
|
||||
|
||||
callbackFreeRedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
|
||||
FreeRedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
|
||||
}
|
||||
catch (Exception)
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
throw;
|
||||
_logger.LogError($"{nameof(DeviceStatusUpdate)} 设备在线状态更新发生异常:{ex.Message}");
|
||||
throw ;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user