diff --git a/JiShe.ServicePro b/JiShe.ServicePro index d88da5e..8471b53 160000 --- a/JiShe.ServicePro +++ b/JiShe.ServicePro @@ -1 +1 @@ -Subproject commit d88da5e29442a906e596baa24e262f9c4a6576fb +Subproject commit 8471b53a3f0e9f2af02b5122ace19d1d5f04a6ec diff --git a/host/JiShe.IoT.HttpApi.Host/Program.cs b/host/JiShe.IoT.HttpApi.Host/Program.cs index 385c0a1..1c9924f 100644 --- a/host/JiShe.IoT.HttpApi.Host/Program.cs +++ b/host/JiShe.IoT.HttpApi.Host/Program.cs @@ -1,5 +1,3 @@ -using Serilog.Events; - namespace JiShe.IoT; public class Program diff --git a/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs new file mode 100644 index 0000000..a0df610 --- /dev/null +++ b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs @@ -0,0 +1,18 @@ +using JiShe.ServicePro.Dto; +using JiShe.ServicePro.Kafka.Internal; + +namespace JiShe.ServicePro.OneNETManagement.Subscribers +{ + /// + /// 数据通讯通道订阅者服务 + /// + public interface IServiceCommunicationChannelSubscriberService + { + /// + /// 设备上下线通知Kafka消息订阅 + /// + /// + /// + Task ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message); + } +} diff --git a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs index 75f830c..124327f 100644 --- a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs +++ b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs @@ -12,6 +12,7 @@ using Mapster; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.IO.Pipelines; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -54,7 +55,7 @@ namespace JiShe.IoT.DeviceAggregation throw; } - + } /// @@ -113,22 +114,7 @@ namespace JiShe.IoT.DeviceAggregation return false; } - UpdateDeviceInput updateDeviceInput = insertResult.Adapt(); - updateDeviceInput.IoTPlatformResponse = pushResult.Serialize(); - updateDeviceInput.IsPlatformPushSuccess = true; - - var updateResult = await deviceAppService.UpdateAsync(updateDeviceInput); - if (updateResult == null) - { - logger.LogError($"{nameof(CreateAsync)} 更新设备信息失败:{input.Serialize()}"); - return false; - } - - //设备数据缓存到Redis - DeviceCacheInfos deviceCacheInfos = insertResult.Adapt(); - deviceCacheInfos.IoTPlatformResponse = updateDeviceInput.IoTPlatformResponse; - - RedisProvider.Instance.HSet(RedisConst.CacheAllDeviceInfoHashKey, insertResult.DeviceAddress, deviceCacheInfos); + await DeviceUpdateHandler(insertResult, pushResult); return true; } @@ -146,7 +132,7 @@ namespace JiShe.IoT.DeviceAggregation /// public async Task CTWingDeviceCreateAsync(CreateDeviceAggregationInput input) { - throw new UserFriendlyException($"CTWing 设备创建失败,功能未实现。"); + throw new UserFriendlyException($"CTWing 设备创建失败,功能未实现。"); } /// @@ -183,14 +169,62 @@ namespace JiShe.IoT.DeviceAggregation { try { - var entityDevice = await FreeSqlDbContext.Instance.Select().Where(f => f.Id == input.Id).FirstAsync(); + var entityDevice = await FreeSqlDbContext.Instance.Select().Where(f => f.Id == input.Id).FirstAsync(); if (entityDevice == null) { throw new UserFriendlyException($"推送失败,未找到设备数据"); } + if (entityDevice.IoTPlatform == ServicePro.Enums.IoTPlatformTypeEnum.CTWing) + { + return await RepushDeviceInfoToCTWing(entityDevice); + } + else if (entityDevice.IoTPlatform == ServicePro.Enums.IoTPlatformTypeEnum.OneNET) + { + return await RepushDeviceInfoToOneNET(entityDevice); + } + + throw new UserFriendlyException($"推送失败,异常的物联网平台"); + } + catch (Exception) + { + + throw; + } + } + + /// + /// 重新推送设备信息到CTWing物联网平台 + /// + /// + /// + /// + public async Task RepushDeviceInfoToCTWing(DeviceManagementInfoDto input) + { + try + { + throw new UserFriendlyException($"推送失败,CTWing暂未实现。"); + } + catch (Exception) + { + + throw; + } + } + + /// + /// 重新推送设备信息到OneNET物联网平台 + /// + /// + /// + /// + public async Task RepushDeviceInfoToOneNET(DeviceManagementInfoDto input) + { + try + { + var productInfo = await FreeSqlDbContext.Instance.Select() - .Where(e => e.IsEnabled == true && e.IoTPlatformProductId == entityDevice.IoTPlatformProductId) + .Where(e => e.IsEnabled == true && e.IoTPlatformProductId == input.IoTPlatformProductId) .FirstAsync(); if (productInfo == null) @@ -202,10 +236,10 @@ namespace JiShe.IoT.DeviceAggregation //推送至OneNET平台 var pushResult = await oneNETDeviceService.CreateDeviceInfoAsync(new CreateDeviceInfoInput() { - DeviceName = entityDevice.IoTPlatformDeviceOpenInfo, + DeviceName = input.IoTPlatformDeviceOpenInfo, ProductId = productInfo.IoTPlatformProductId, OneNETAccountId = productInfo.OneNETAccountId, - Description = entityDevice.DeviceAddress, + Description = input.DeviceAddress, }); if (pushResult == null || pushResult.Code != ServicePro.Enums.ResponeResultEnum.Success) @@ -215,22 +249,7 @@ namespace JiShe.IoT.DeviceAggregation } - UpdateDeviceInput updateDeviceInput = entityDevice.Adapt(); - updateDeviceInput.IoTPlatformResponse = pushResult.Serialize(); - - var updateResult = await deviceAppService.UpdateAsync(updateDeviceInput); - if (updateResult == null) - { - logger.LogError($"{nameof(CreateAsync)} 更新设备信息失败:{input.Serialize()}"); - throw new UserFriendlyException($"推送结果更新失败。"); - } - - //设备数据缓存到Redis - DeviceCacheInfos deviceCacheInfos = entityDevice.Adapt(); - deviceCacheInfos.IoTPlatformResponse = updateDeviceInput.IoTPlatformResponse; - RedisProvider.Instance.HSet(RedisConst.CacheAllDeviceInfoHashKey, entityDevice.DeviceAddress, deviceCacheInfos); - - return entityDevice.Adapt(); + return await DeviceUpdateHandler(input, pushResult); } catch (Exception) { @@ -238,5 +257,35 @@ namespace JiShe.IoT.DeviceAggregation throw; } } + + /// + /// 更新设备信息并处理缓存 + /// + /// + /// + /// + /// + private async Task DeviceUpdateHandler(DeviceManagementInfoDto input, HttpDataResult pushResult) + { + UpdateDeviceInput updateDeviceInput = input.Adapt(); + updateDeviceInput.IoTPlatformResponse = pushResult.Serialize(); + updateDeviceInput.IsPlatformPushSuccess = true; + + var updateResult = await deviceAppService.UpdateAsync(updateDeviceInput); + if (updateResult == null) + { + logger.LogError($"{nameof(CreateAsync)} 更新设备信息失败:{input.Serialize()}"); + throw new UserFriendlyException($"推送结果更新失败。"); + } + + //设备数据缓存到Redis + DeviceCacheInfos deviceCacheInfos = input.Adapt(); + deviceCacheInfos.IoTPlatformResponse = null; + deviceCacheInfos.PlatformPassword = null; + + RedisProvider.Instance.HSet(RedisConst.CacheAllDeviceInfoHashKey, input.DeviceAddress, deviceCacheInfos); + + return input.Adapt(); + } } } diff --git a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs new file mode 100644 index 0000000..1cbfaab --- /dev/null +++ b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs @@ -0,0 +1,95 @@ +using JiShe.IoT; +using JiShe.ServicePro.ApacheIoTDB.Provider.Model; +using JiShe.ServicePro.ApacheIoTDB.Provider.Options; +using JiShe.ServicePro.Consts; +using JiShe.ServicePro.Core; +using JiShe.ServicePro.DataChannelManages; +using JiShe.ServicePro.DeviceManagement.DeviceInfos; +using JiShe.ServicePro.Dto; +using JiShe.ServicePro.Enums; +using JiShe.ServicePro.FreeRedisProvider; +using JiShe.ServicePro.IoTDBManagement.DataChannels; +using JiShe.ServicePro.IoTDBManagement.TableModels; +using JiShe.ServicePro.Kafka.Attributes; +using JiShe.ServicePro.Kafka.Consts; +using JiShe.ServicePro.Kafka.Internal; +using JiShe.ServicePro.OneNET.Provider.ReceiveModels; +using JiShe.ServicePro.OneNET.Provider.ReceiveModels.ThingModeData; +using JiShe.ServicePro.ServerOptions; +using Mapster; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Volo.Abp; +using Volo.Abp.Auditing; + +namespace JiShe.ServicePro.OneNETManagement.Subscribers +{ + /// + /// 设备状态统一订阅订阅服务 + /// + [DisableAuditing] + public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe + { + private readonly ILogger _logger; + private readonly IoTDBOptions _iotDBOptions; + + /// + /// 设备状态统一订阅订阅服务 + /// + /// + /// + public ServiceCommunicationChannelSubscriberService(ILogger logger, IOptions iotDBOptions) + { + _logger = logger; + _iotDBOptions = iotDBOptions.Value; + } + + /// + /// 设备上下线通知Kafka消息订阅 + /// + /// + /// + [KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationDeviceStatusEventName)] + public async Task ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message) + { + try + { + var deviceEntity = await FreeSqlDbContext.Instance.Select() + .Where(d => d.IoTPlatform == message.IoTPlatform && d.DeviceAddress == message.DeviceAddress) + .FirstAsync(); + if (deviceEntity == null) + { + return SubscribeAck.Fail(); + } + + deviceEntity.DeviceOnlineStatus = message.Status; + if (message.Status == DeviceOnlineStatusEnum.Online) + { + deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds,DateTimeKind.Local); + } + else if (message.Status == DeviceOnlineStatusEnum.Offline) + { + deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local); + } + + var updateResult = await FreeSqlDbContext.Instance.Update() + .SetSource(deviceEntity) + .UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime }) + .ExecuteAffrowsAsync(); + + if (updateResult <=0) + { + _logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{message.Serialize()}"); + return SubscribeAck.Fail(); + } + + return SubscribeAck.Success(); + } + catch (Exception) + { + + throw; + } + } + } +}