diff --git a/JiShe.ServicePro b/JiShe.ServicePro index 3dbd0eb..b61e2d1 160000 --- a/JiShe.ServicePro +++ b/JiShe.ServicePro @@ -1 +1 @@ -Subproject commit 3dbd0ebf5fff82ae03945631aa8e401620931ab1 +Subproject commit b61e2d1f76755f3763fcbc1256abd2d75c94f7a9 diff --git a/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs index a0df610..66cc594 100644 --- a/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs +++ b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs @@ -1,7 +1,4 @@ -using JiShe.ServicePro.Dto; -using JiShe.ServicePro.Kafka.Internal; - -namespace JiShe.ServicePro.OneNETManagement.Subscribers +namespace JiShe.ServicePro.OneNETManagement.Subscribers { /// /// 数据通讯通道订阅者服务 @@ -9,10 +6,9 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers public interface IServiceCommunicationChannelSubscriberService { /// - /// 设备上下线通知Kafka消息订阅 - /// - /// + /// 设备上下线通知 Redis 消息订阅 + /// /// - Task ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message); + Task ServiceCommunicationDeviceStatusSubscriber(); } } diff --git a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs index 2a5b3f1..60fd3d8 100644 --- a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs +++ b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs @@ -25,51 +25,67 @@ using Volo.Abp.Auditing; namespace JiShe.ServicePro.OneNETManagement.Subscribers { /// - /// 设备状态统一订阅订阅服务 + /// 数据通讯通道订阅者服务 /// [DisableAuditing] - public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe + public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService { private readonly ILogger _logger; private readonly IoTDBOptions _iotDBOptions; + private readonly IRedisPubSubService _redisPubSubService; /// /// 设备状态统一订阅订阅服务 /// /// /// - public ServiceCommunicationChannelSubscriberService(ILogger logger, IOptions iotDBOptions) + /// + public ServiceCommunicationChannelSubscriberService(ILogger logger, IOptions iotDBOptions, IRedisPubSubService redisPubSubService) { _logger = logger; _iotDBOptions = iotDBOptions.Value; + _redisPubSubService = redisPubSubService; } /// - /// 设备上下线通知Kafka消息订阅 + /// 设备上下线通知 Redis 消息订阅 /// - /// /// - [KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationDeviceStatusEventName)] - public async Task ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message) + public async Task ServiceCommunicationDeviceStatusSubscriber() { try { + DeviceStatusMessage deviceStatusMessage = null; + + // 订阅频道 + await _redisPubSubService.SubscribeAsync(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) => + { + _logger.LogWarning($"Redis订阅收到消息: {message}"); + + deviceStatusMessage = message.Deserialize(); + }); + + if (deviceStatusMessage == null) + { + throw new UserFriendlyException($"设备状态消息为空"); + } + var deviceEntity = await FreeSqlDbContext.Instance.Select() - .Where(d => d.IoTPlatform == message.IoTPlatform && d.DeviceAddress == message.DeviceAddress) + .Where(d => d.IoTPlatform == deviceStatusMessage.IoTPlatform && d.DeviceAddress == deviceStatusMessage.DeviceAddress) .FirstAsync(); if (deviceEntity == null) { - return SubscribeAck.Fail(); + throw new UserFriendlyException($"{deviceStatusMessage.DeviceAddress} 设备不存在"); } - deviceEntity.DeviceOnlineStatus = message.Status; - if (message.Status == DeviceOnlineStatusEnum.Online) + deviceEntity.DeviceOnlineStatus = deviceStatusMessage.Status; + if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Online) { - deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds,DateTimeKind.Local); + deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local); } - else if (message.Status == DeviceOnlineStatusEnum.Offline) + else if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Offline) { - deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local); + deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local); } var updateResult = await FreeSqlDbContext.Instance.Update() @@ -77,10 +93,11 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers .UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime }) .ExecuteAffrowsAsync(); - if (updateResult <=0) + if (updateResult <= 0) { - _logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{message.Serialize()}"); - return SubscribeAck.Fail(); + _logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{deviceStatusMessage.Serialize()}"); + + throw new UserFriendlyException($"设备状态更新失败"); } //更新设备数据缓存到Redis @@ -89,8 +106,6 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers deviceCacheInfos.PlatformPassword = null; RedisProvider.Instance.HSet(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos); - - return SubscribeAck.Success(); } catch (Exception) {