From 2fcf9d854a1f10089a3f47065aa87db7ae3bcc26 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Fri, 22 Aug 2025 17:32:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Redis=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E5=8F=91=E5=B8=83=E7=BB=84=E4=BB=B6=EF=BC=8C=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=88=86=E5=B8=83=E5=BC=8F=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E4=B8=AD=E5=BF=83=E4=B8=BB=E9=A2=98=EF=BC=8C=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E9=AB=98=E6=80=A7=E8=83=BDTask=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86=E4=B8=AD=E5=BF=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../BusinessSystemAggregationService.cs | 5 ++--- .../DeviceAggregationService.cs | 7 ++----- .../IoTApplicationModule.cs | 4 ++-- ...viceCommunicationChannelSubscriberService.cs | 17 +++-------------- 4 files changed, 9 insertions(+), 24 deletions(-) diff --git a/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs b/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs index dbb79ee..16f22fc 100644 --- a/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs +++ b/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs @@ -5,7 +5,6 @@ using JiShe.ServicePro.Dto; using JiShe.ServicePro.Encrypt; using JiShe.ServicePro.Enums; using JiShe.ServicePro.FreeRedisProvider; -using JiShe.ServicePro.Kafka.Consts; using Microsoft.Extensions.Options; namespace JiShe.IoT.BusinessSystemAggregation @@ -65,11 +64,11 @@ namespace JiShe.IoT.BusinessSystemAggregation //将指令存储Kafka的OneNET主题中 if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET) { - await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, input); + await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, input); } else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing) { - await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName, input); + await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName, input); } else { diff --git a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs index 626a66e..5715fab 100644 --- a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs +++ b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs @@ -7,9 +7,6 @@ using JiShe.ServicePro.DeviceManagement.Permissions; using JiShe.ServicePro.Dto; using JiShe.ServicePro.Enums; using JiShe.ServicePro.FreeRedisProvider; -using JiShe.ServicePro.Kafka.Consts; -using JiShe.ServicePro.Kafka.Producer; -using JiShe.ServicePro.OneNET.Provider.OpenApiModels.Devices; using JiShe.ServicePro.OneNETManagement.OneNETDevices; using JiShe.ServicePro.OneNETManagement.OneNETProducts; using Mapster; @@ -300,7 +297,7 @@ namespace JiShe.IoT.DeviceAggregation } else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing) { - await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName,commandRequest); + await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName,commandRequest); return true; } else @@ -599,7 +596,7 @@ namespace JiShe.IoT.DeviceAggregation throw new UserFriendlyException("设备不在线"); } - await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, commandRequest); + await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, commandRequest); return true; } catch (Exception) diff --git a/src/JiShe.IoT.Application/IoTApplicationModule.cs b/src/JiShe.IoT.Application/IoTApplicationModule.cs index 999eba0..0649e51 100644 --- a/src/JiShe.IoT.Application/IoTApplicationModule.cs +++ b/src/JiShe.IoT.Application/IoTApplicationModule.cs @@ -60,8 +60,8 @@ namespace JiShe.IoT var commonService = context.ServiceProvider.GetRequiredService(); commonService.InitSelectTypetList(); - var issueSubscriberService = context.ServiceProvider.GetRequiredService(); - issueSubscriberService.IssueCommandRedisSubscriber(); + var serviceCommunicationChannelSubscriberService = context.ServiceProvider.GetRequiredService(); + serviceCommunicationChannelSubscriberService.ServiceCommunicationDeviceStatusSubscriber(); } } } diff --git a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs index f8b8927..edb7ec4 100644 --- a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs +++ b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs @@ -45,21 +45,10 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers // 订阅频道 - await _redisPubSubService.SubscribeReliableAsync(RedisConst.ServiceCommunicationDeviceStatusEventName, async (message) => + await _redisPubSubService.SubscribeReliableAsync(DistributedMessageCenterConst.ServiceCommunicationDeviceStatusEventName, async (message) => { - try - { - //_logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}"); - - HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql).ConfigureAwait(false).GetAwaiter().GetResult(); - return true; - } - catch (Exception ex) - { - _logger.LogError($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}"); - await _redisPubSubService.PublishReliableAsync(RedisConst.ServiceCommunicationDeviceStatusDLQ, message); - return false; - } + await HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql); + return true; }); } catch (Exception)