diff --git a/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs b/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs index dbb79ee..16f22fc 100644 --- a/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs +++ b/src/JiShe.IoT.Application/BusinessSystemAggregation/BusinessSystemAggregationService.cs @@ -5,7 +5,6 @@ using JiShe.ServicePro.Dto; using JiShe.ServicePro.Encrypt; using JiShe.ServicePro.Enums; using JiShe.ServicePro.FreeRedisProvider; -using JiShe.ServicePro.Kafka.Consts; using Microsoft.Extensions.Options; namespace JiShe.IoT.BusinessSystemAggregation @@ -65,11 +64,11 @@ namespace JiShe.IoT.BusinessSystemAggregation //将指令存储Kafka的OneNET主题中 if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET) { - await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, input); + await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, input); } else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing) { - await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName, input); + await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName, input); } else { diff --git a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs index 626a66e..5715fab 100644 --- a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs +++ b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs @@ -7,9 +7,6 @@ using JiShe.ServicePro.DeviceManagement.Permissions; using JiShe.ServicePro.Dto; using JiShe.ServicePro.Enums; using JiShe.ServicePro.FreeRedisProvider; -using JiShe.ServicePro.Kafka.Consts; -using JiShe.ServicePro.Kafka.Producer; -using JiShe.ServicePro.OneNET.Provider.OpenApiModels.Devices; using JiShe.ServicePro.OneNETManagement.OneNETDevices; using JiShe.ServicePro.OneNETManagement.OneNETProducts; using Mapster; @@ -300,7 +297,7 @@ namespace JiShe.IoT.DeviceAggregation } else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing) { - await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName,commandRequest); + await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName,commandRequest); return true; } else @@ -599,7 +596,7 @@ namespace JiShe.IoT.DeviceAggregation throw new UserFriendlyException("设备不在线"); } - await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, commandRequest); + await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, commandRequest); return true; } catch (Exception) diff --git a/src/JiShe.IoT.Application/IoTApplicationModule.cs b/src/JiShe.IoT.Application/IoTApplicationModule.cs index 999eba0..0649e51 100644 --- a/src/JiShe.IoT.Application/IoTApplicationModule.cs +++ b/src/JiShe.IoT.Application/IoTApplicationModule.cs @@ -60,8 +60,8 @@ namespace JiShe.IoT var commonService = context.ServiceProvider.GetRequiredService(); commonService.InitSelectTypetList(); - var issueSubscriberService = context.ServiceProvider.GetRequiredService(); - issueSubscriberService.IssueCommandRedisSubscriber(); + var serviceCommunicationChannelSubscriberService = context.ServiceProvider.GetRequiredService(); + serviceCommunicationChannelSubscriberService.ServiceCommunicationDeviceStatusSubscriber(); } } } diff --git a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs index f8b8927..edb7ec4 100644 --- a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs +++ b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs @@ -45,21 +45,10 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers // 订阅频道 - await _redisPubSubService.SubscribeReliableAsync(RedisConst.ServiceCommunicationDeviceStatusEventName, async (message) => + await _redisPubSubService.SubscribeReliableAsync(DistributedMessageCenterConst.ServiceCommunicationDeviceStatusEventName, async (message) => { - try - { - //_logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}"); - - HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql).ConfigureAwait(false).GetAwaiter().GetResult(); - return true; - } - catch (Exception ex) - { - _logger.LogError($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}"); - await _redisPubSubService.PublishReliableAsync(RedisConst.ServiceCommunicationDeviceStatusDLQ, message); - return false; - } + await HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql); + return true; }); } catch (Exception)