From 2b5a3a60e247ce34596acad9055299c1fcf2092a Mon Sep 17 00:00:00 2001
From: ChenYi <296215406@outlook.com>
Date: Thu, 14 Aug 2025 11:45:58 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84Redis=E7=BB=84=E4=BB=B6?=
=?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E5=8F=91=E5=B8=83=E8=AE=A2=E9=98=85?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
JiShe.ServicePro | 2 +-
...ceCommunicationChannelSubscriberService.cs | 12 ++---
...ceCommunicationChannelSubscriberService.cs | 53 ++++++++++++-------
3 files changed, 39 insertions(+), 28 deletions(-)
diff --git a/JiShe.ServicePro b/JiShe.ServicePro
index 3dbd0eb..b61e2d1 160000
--- a/JiShe.ServicePro
+++ b/JiShe.ServicePro
@@ -1 +1 @@
-Subproject commit 3dbd0ebf5fff82ae03945631aa8e401620931ab1
+Subproject commit b61e2d1f76755f3763fcbc1256abd2d75c94f7a9
diff --git a/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs
index a0df610..66cc594 100644
--- a/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs
+++ b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs
@@ -1,7 +1,4 @@
-using JiShe.ServicePro.Dto;
-using JiShe.ServicePro.Kafka.Internal;
-
-namespace JiShe.ServicePro.OneNETManagement.Subscribers
+namespace JiShe.ServicePro.OneNETManagement.Subscribers
{
///
/// 数据通讯通道订阅者服务
@@ -9,10 +6,9 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
public interface IServiceCommunicationChannelSubscriberService
{
///
- /// 设备上下线通知Kafka消息订阅
- ///
- ///
+ /// 设备上下线通知 Redis 消息订阅
+ ///
///
- Task ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message);
+ Task ServiceCommunicationDeviceStatusSubscriber();
}
}
diff --git a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs
index 2a5b3f1..60fd3d8 100644
--- a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs
+++ b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs
@@ -25,51 +25,67 @@ using Volo.Abp.Auditing;
namespace JiShe.ServicePro.OneNETManagement.Subscribers
{
///
- /// 设备状态统一订阅订阅服务
+ /// 数据通讯通道订阅者服务
///
[DisableAuditing]
- public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe
+ public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
{
private readonly ILogger _logger;
private readonly IoTDBOptions _iotDBOptions;
+ private readonly IRedisPubSubService _redisPubSubService;
///
/// 设备状态统一订阅订阅服务
///
///
///
- public ServiceCommunicationChannelSubscriberService(ILogger logger, IOptions iotDBOptions)
+ ///
+ public ServiceCommunicationChannelSubscriberService(ILogger logger, IOptions iotDBOptions, IRedisPubSubService redisPubSubService)
{
_logger = logger;
_iotDBOptions = iotDBOptions.Value;
+ _redisPubSubService = redisPubSubService;
}
///
- /// 设备上下线通知Kafka消息订阅
+ /// 设备上下线通知 Redis 消息订阅
///
- ///
///
- [KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationDeviceStatusEventName)]
- public async Task ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message)
+ public async Task ServiceCommunicationDeviceStatusSubscriber()
{
try
{
+ DeviceStatusMessage deviceStatusMessage = null;
+
+ // 订阅频道
+ await _redisPubSubService.SubscribeAsync(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
+ {
+ _logger.LogWarning($"Redis订阅收到消息: {message}");
+
+ deviceStatusMessage = message.Deserialize();
+ });
+
+ if (deviceStatusMessage == null)
+ {
+ throw new UserFriendlyException($"设备状态消息为空");
+ }
+
var deviceEntity = await FreeSqlDbContext.Instance.Select()
- .Where(d => d.IoTPlatform == message.IoTPlatform && d.DeviceAddress == message.DeviceAddress)
+ .Where(d => d.IoTPlatform == deviceStatusMessage.IoTPlatform && d.DeviceAddress == deviceStatusMessage.DeviceAddress)
.FirstAsync();
if (deviceEntity == null)
{
- return SubscribeAck.Fail();
+ throw new UserFriendlyException($"{deviceStatusMessage.DeviceAddress} 设备不存在");
}
- deviceEntity.DeviceOnlineStatus = message.Status;
- if (message.Status == DeviceOnlineStatusEnum.Online)
+ deviceEntity.DeviceOnlineStatus = deviceStatusMessage.Status;
+ if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Online)
{
- deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds,DateTimeKind.Local);
+ deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
}
- else if (message.Status == DeviceOnlineStatusEnum.Offline)
+ else if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Offline)
{
- deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
+ deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
}
var updateResult = await FreeSqlDbContext.Instance.Update()
@@ -77,10 +93,11 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
.ExecuteAffrowsAsync();
- if (updateResult <=0)
+ if (updateResult <= 0)
{
- _logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{message.Serialize()}");
- return SubscribeAck.Fail();
+ _logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{deviceStatusMessage.Serialize()}");
+
+ throw new UserFriendlyException($"设备状态更新失败");
}
//更新设备数据缓存到Redis
@@ -89,8 +106,6 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
deviceCacheInfos.PlatformPassword = null;
RedisProvider.Instance.HSet(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
-
- return SubscribeAck.Success();
}
catch (Exception)
{