完善Redis组件,支持发布订阅
This commit is contained in:
parent
79a42085dc
commit
2b5a3a60e2
@ -1 +1 @@
|
||||
Subproject commit 3dbd0ebf5fff82ae03945631aa8e401620931ab1
|
||||
Subproject commit b61e2d1f76755f3763fcbc1256abd2d75c94f7a9
|
||||
@ -1,7 +1,4 @@
|
||||
using JiShe.ServicePro.Dto;
|
||||
using JiShe.ServicePro.Kafka.Internal;
|
||||
|
||||
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
{
|
||||
/// <summary>
|
||||
/// 数据通讯通道订阅者服务
|
||||
@ -9,10 +6,9 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
public interface IServiceCommunicationChannelSubscriberService
|
||||
{
|
||||
/// <summary>
|
||||
/// 设备上下线通知Kafka消息订阅
|
||||
/// 设备上下线通知 Redis 消息订阅
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message);
|
||||
Task ServiceCommunicationDeviceStatusSubscriber();
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,51 +25,67 @@ using Volo.Abp.Auditing;
|
||||
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
{
|
||||
/// <summary>
|
||||
/// 设备状态统一订阅订阅服务
|
||||
/// 数据通讯通道订阅者服务
|
||||
/// </summary>
|
||||
[DisableAuditing]
|
||||
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe
|
||||
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
|
||||
{
|
||||
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
|
||||
private readonly IoTDBOptions _iotDBOptions;
|
||||
private readonly IRedisPubSubService _redisPubSubService;
|
||||
|
||||
/// <summary>
|
||||
/// 设备状态统一订阅订阅服务
|
||||
/// </summary>
|
||||
/// <param name="logger"></param>
|
||||
/// <param name="iotDBOptions"></param>
|
||||
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions)
|
||||
/// <param name="redisPubSubService"></param>
|
||||
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions, IRedisPubSubService redisPubSubService)
|
||||
{
|
||||
_logger = logger;
|
||||
_iotDBOptions = iotDBOptions.Value;
|
||||
_redisPubSubService = redisPubSubService;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 设备上下线通知Kafka消息订阅
|
||||
/// 设备上下线通知 Redis 消息订阅
|
||||
/// </summary>
|
||||
/// <param name="message"></param>
|
||||
/// <returns></returns>
|
||||
[KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationDeviceStatusEventName)]
|
||||
public async Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message)
|
||||
public async Task ServiceCommunicationDeviceStatusSubscriber()
|
||||
{
|
||||
try
|
||||
{
|
||||
DeviceStatusMessage deviceStatusMessage = null;
|
||||
|
||||
// 订阅频道
|
||||
await _redisPubSubService.SubscribeAsync(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
|
||||
{
|
||||
_logger.LogWarning($"Redis订阅收到消息: {message}");
|
||||
|
||||
deviceStatusMessage = message.Deserialize<DeviceStatusMessage>();
|
||||
});
|
||||
|
||||
if (deviceStatusMessage == null)
|
||||
{
|
||||
throw new UserFriendlyException($"设备状态消息为空");
|
||||
}
|
||||
|
||||
var deviceEntity = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
|
||||
.Where(d => d.IoTPlatform == message.IoTPlatform && d.DeviceAddress == message.DeviceAddress)
|
||||
.Where(d => d.IoTPlatform == deviceStatusMessage.IoTPlatform && d.DeviceAddress == deviceStatusMessage.DeviceAddress)
|
||||
.FirstAsync();
|
||||
if (deviceEntity == null)
|
||||
{
|
||||
return SubscribeAck.Fail();
|
||||
throw new UserFriendlyException($"{deviceStatusMessage.DeviceAddress} 设备不存在");
|
||||
}
|
||||
|
||||
deviceEntity.DeviceOnlineStatus = message.Status;
|
||||
if (message.Status == DeviceOnlineStatusEnum.Online)
|
||||
deviceEntity.DeviceOnlineStatus = deviceStatusMessage.Status;
|
||||
if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Online)
|
||||
{
|
||||
deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds,DateTimeKind.Local);
|
||||
deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||
}
|
||||
else if (message.Status == DeviceOnlineStatusEnum.Offline)
|
||||
else if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Offline)
|
||||
{
|
||||
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||
}
|
||||
|
||||
var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
|
||||
@ -79,8 +95,9 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
|
||||
if (updateResult <= 0)
|
||||
{
|
||||
_logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{message.Serialize()}");
|
||||
return SubscribeAck.Fail();
|
||||
_logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{deviceStatusMessage.Serialize()}");
|
||||
|
||||
throw new UserFriendlyException($"设备状态更新失败");
|
||||
}
|
||||
|
||||
//更新设备数据缓存到Redis
|
||||
@ -89,8 +106,6 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
deviceCacheInfos.PlatformPassword = null;
|
||||
|
||||
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
|
||||
|
||||
return SubscribeAck.Success();
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user