From 777e1303a910ac8b0f358869d3112aa0fd0a1e38 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Thu, 8 Jan 2026 17:29:54 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=BE=E5=A4=87=E5=8D=87=E7=BA=A7=E7=BB=93?= =?UTF-8?q?=E6=9E=9C=E6=9B=B4=E6=96=B0=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ceCommunicationChannelSubscriberService.cs | 17 ++- .../IoTApplicationModule.cs | 1 + ...ceCommunicationChannelSubscriberService.cs | 100 +++++++++++++++++- 3 files changed, 113 insertions(+), 5 deletions(-) diff --git a/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs index c6a39ac..52ded05 100644 --- a/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs +++ b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs @@ -14,10 +14,23 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers Task ServiceCommunicationDeviceStatusSubscriber(); /// - /// 设备状态更新 + /// 设备在线状态更新 /// /// /// - Task DeviceStatusUpdate(DeviceStatusMessage input); + Task DeviceOnlineStatusUpdate(DeviceStatusMessage input); + + /// + /// 设备升级结果通知 Redis 消息订阅 + /// + /// + Task ServiceCommunicationDeviceUpgradeSubscriber(); + + /// + /// 设备升级结果更新 + /// + /// + /// + Task DeviceUpgradeResultUpdate(UpgradeResultMessage input); } } diff --git a/src/JiShe.IoT.Application/IoTApplicationModule.cs b/src/JiShe.IoT.Application/IoTApplicationModule.cs index 09fa0f5..de431fb 100644 --- a/src/JiShe.IoT.Application/IoTApplicationModule.cs +++ b/src/JiShe.IoT.Application/IoTApplicationModule.cs @@ -61,6 +61,7 @@ namespace JiShe.IoT var serviceCommunicationChannelSubscriberService = context.ServiceProvider.GetRequiredService(); serviceCommunicationChannelSubscriberService.ServiceCommunicationDeviceStatusSubscriber(); + serviceCommunicationChannelSubscriberService.ServiceCommunicationDeviceUpgradeSubscriber(); } } } diff --git a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs index f8c790a..bb4bd07 100644 --- a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs +++ b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs @@ -2,6 +2,7 @@ using JiShe.ServicePro.Core; using JiShe.ServicePro.DeviceManagement.DeviceInfos; using JiShe.ServicePro.Dto; +using JiShe.ServicePro.Enums; using JiShe.ServicePro.FreeRedisProvider; using JiShe.ServicePro.FreeSqlProvider; using Mapster; @@ -54,7 +55,7 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers var scopedServiceCommunicationChannelSubscriberService = sp.GetRequiredService(); var scopedLogger = sp.GetRequiredService>(); scopedLogger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)}收到设备{message.DeviceAddress}上下线状态更新通知,开始更新{message.Status}状态处理..."); - await scopedServiceCommunicationChannelSubscriberService.DeviceStatusUpdate(message); + await scopedServiceCommunicationChannelSubscriberService.DeviceOnlineStatusUpdate(message); scopedLogger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)}设备{message.DeviceAddress}上下线状态更新通知,结束更新{message.Status}状态处理..."); } return true; @@ -73,7 +74,7 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers /// /// /// - public async Task DeviceStatusUpdate(DeviceStatusMessage input) + public async Task DeviceOnlineStatusUpdate(DeviceStatusMessage input) { try { @@ -120,9 +121,102 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers } catch (Exception ex) { - _logger.LogError($"{nameof(DeviceStatusUpdate)} 设备在线状态更新发生异常:{ex.Message}"); + _logger.LogError($"{nameof(DeviceOnlineStatusUpdate)} 设备在线状态更新发生异常:{ex.Message}"); throw ; } } + + + /// + /// 设备升级结果通知 Redis 消息订阅 + /// + /// + public async Task ServiceCommunicationDeviceUpgradeSubscriber() + { + try + { + // 订阅频道 + await _reliableRedisPubSubService.SubscribeReliableAsync(DistributedMessageCenterConst.ServiceCommunicationDeviceUpgradeEventName, async (message) => + { + using (var scope = _serviceScopeFactory.CreateScope()) + { + var sp = scope.ServiceProvider; + var scopedServiceCommunicationChannelSubscriberService = sp.GetRequiredService(); + var scopedLogger = sp.GetRequiredService>(); + scopedLogger.LogError($"{nameof(ServiceCommunicationDeviceUpgradeSubscriber)}收到设备{message.DeviceAddress} 升级结果更新通知,开始更新{message.ResultType}状态处理..."); + await scopedServiceCommunicationChannelSubscriberService.DeviceUpgradeResultUpdate(message); + scopedLogger.LogError($"{nameof(ServiceCommunicationDeviceUpgradeSubscriber)}设备{message.DeviceAddress} 升级结果更新通知,结束更新{message.ResultType}状态处理..."); + } + return true; + }); + } + catch (Exception) + { + + throw; + } + } + + /// + /// 设备升级结果更新 + /// + /// + /// + /// + public async Task DeviceUpgradeResultUpdate(UpgradeResultMessage input) + { + try + { + if (input == null || input.DeviceAddress.IsNullOrWhiteSpace()) + { + throw new UserFriendlyException($"设备升级结果消息处理失败,参数为空"); + } + + var deviceUpgradeRecordEntity = await FreeSqlDbContext.Instance.Select() + .Where(d => d.UpgradeIdentifier == input.UpgradeIdentifier && d.DeviceAddress == input.DeviceAddress) + .FirstAsync(); + if (deviceUpgradeRecordEntity == null) + { + throw new UserFriendlyException($"设备 {input.DeviceAddress} 的标识符{input.UpgradeIdentifier}升级记录不存在"); + } + + + if (input.ResultType == DeviceUpgradeResultTypeEnum.Success) + { + deviceUpgradeRecordEntity.UpgradeResult = input.ResultType; + deviceUpgradeRecordEntity.UpgradeStatus = DeviceUpgradeStatusTypeEnum.UpgradeSuccess; + deviceUpgradeRecordEntity.UpgradeDate = TimestampHelper.ConvertToDateTime(input.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local); + } + else + { + deviceUpgradeRecordEntity.UpgradeResult = input.ResultType; + deviceUpgradeRecordEntity.UpgradeStatus = DeviceUpgradeStatusTypeEnum.UpgradeFailed; + deviceUpgradeRecordEntity.UpgradeDate = TimestampHelper.ConvertToDateTime(input.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local); + } + + var updateResult = await FreeSqlDbContext.Instance.Update() + .SetSource(deviceUpgradeRecordEntity) + .UpdateColumns(a => new { a.UpgradeResult, a.UpgradeStatus, a.UpgradeDate }) + .ExecuteAffrowsAsync(); + + if (updateResult <= 0) + { + _logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{input.Serialize()}"); + + throw new UserFriendlyException($"设备状态更新失败"); + } + + //更新设备数据缓存到Redis + DeviceCacheInfos deviceCacheInfos = deviceUpgradeRecordEntity.Adapt(); + deviceCacheInfos.PlatformPassword = null; + + FreeRedisProvider.Instance.HSet(RedisConst.CacheAllDeviceInfoHashKey, deviceUpgradeRecordEntity.DeviceAddress, deviceCacheInfos); + } + catch (Exception ex) + { + _logger.LogError($"{nameof(DeviceUpgradeResultUpdate)} 设备升级结果状态更新发生异常:{ex.Message}"); + throw; + } + } } }