diff --git a/DockerComposeShells/iotdb/init/init-iot-db.sql b/DockerComposeShells/iotdb/init/init-iot-db.sql
index 48009e4..9ea3ccc 100644
--- a/DockerComposeShells/iotdb/init/init-iot-db.sql
+++ b/DockerComposeShells/iotdb/init/init-iot-db.sql
@@ -1,5 +1,5 @@
-CREATE TABLE IF NOT EXISTS CTWingAepReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, SubDevice STRING TAG, PlatformTenantId STRING FIELD, ProductId STRING FIELD, ServiceId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD, IMSI STRING FIELD, IMEI STRING FIELD ) COMMENT 'CTWingAepReceiveMessageEntity';
+CREATE TABLE IF NOT EXISTS CTWingAepReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, PlatformTenantId STRING FIELD, ProductId STRING FIELD, ServiceId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD, IMSI STRING FIELD, IMEI STRING FIELD ) COMMENT 'CTWingAepReceiveMessageEntity';
-CREATE TABLE IF NOT EXISTS DeviceTelemetryPacketTaskInfo( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, SubDevice STRING TAG, IoTPlatformProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, IoTPlatformProductName STRING FIELD, IoTPlatformAccountId STRING FIELD, AccountPhoneNumber STRING FIELD, IssueRawMessage STRING FIELD, IssuePayload STRING FIELD, ResponseRawMessage STRING FIELD, ResponsePayload STRING FIELD, TelemetrySource INT32 FIELD, IoTPlatform INT32 FIELD, RetryCount INT32 FIELD, LastIssueTime TIMESTAMP FIELD, IssueStatus INT32 FIELD ) COMMENT 'DeviceTelemetryPacketTaskInfo';
+CREATE TABLE IF NOT EXISTS DeviceTelemetryPacketTaskInfo( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, IoTPlatformProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, IoTPlatformProductName STRING FIELD, IoTPlatformAccountId STRING FIELD, AccountPhoneNumber STRING FIELD, IssueRawMessage STRING FIELD, IssuePayload STRING FIELD, ResponseRawMessage STRING FIELD, ResponsePayload STRING FIELD, TelemetrySource INT32 FIELD, IoTPlatform INT32 FIELD, RetryCount INT32 FIELD, LastIssueTime TIMESTAMP FIELD, IssueStatus INT32 FIELD ) COMMENT 'DeviceTelemetryPacketTaskInfo';
-CREATE TABLE IF NOT EXISTS OneNETReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, SubDevice STRING TAG, ProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, IsEncrypted BOOLEAN FIELD, PlaintextMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD ) COMMENT 'OneNETReceiveMessageEntity';
\ No newline at end of file
+CREATE TABLE IF NOT EXISTS OneNETReceiveMessageEntity( time TIMESTAMP TIME, IoTDataType STRING TAG, DeviceType STRING TAG, DeviceAddress STRING TAG, ProductId STRING FIELD, IoTPlatformDeviceOpenInfo STRING FIELD, MessageType STRING FIELD, Protocol STRING FIELD, RawMessage STRING FIELD, IsEncrypted BOOLEAN FIELD, PlaintextMessage STRING FIELD, ReceivedPayload STRING FIELD, ReceivedTimestamps INT64 FIELD ) COMMENT 'OneNETReceiveMessageEntity';
\ No newline at end of file
diff --git a/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs
index 66cc594..c6a39ac 100644
--- a/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs
+++ b/src/JiShe.IoT.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs
@@ -1,4 +1,6 @@
-namespace JiShe.ServicePro.OneNETManagement.Subscribers
+using JiShe.ServicePro.Dto;
+
+namespace JiShe.ServicePro.OneNETManagement.Subscribers
{
///
/// 数据通讯通道订阅者服务
@@ -10,5 +12,12 @@
///
///
Task ServiceCommunicationDeviceStatusSubscriber();
+
+ ///
+ /// 设备状态更新
+ ///
+ ///
+ ///
+ Task DeviceStatusUpdate(DeviceStatusMessage input);
}
}
diff --git a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs
index f4d00b8..5aeb726 100644
--- a/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs
+++ b/src/JiShe.IoT.Application/DeviceAggregation/DeviceAggregationService.cs
@@ -492,19 +492,19 @@ namespace JiShe.IoT.DeviceAggregation
{
try
{
- if(input.AddressList == null || input.AddressList.Count <= 0)
+ if (input.AddressList == null || input.AddressList.Count <= 0)
{
- throw new UserFriendlyException($"{nameof(DeviceBatchUpgradeForApiAsync)} 设备批量升级时地址列表不能为空","-101");
+ throw new UserFriendlyException($"{nameof(DeviceBatchUpgradeForApiAsync)} 设备批量升级时地址列表不能为空", "-101");
}
- if (input.AddressList.Count >200)
+ if (input.AddressList.Count > 200)
{
throw new UserFriendlyException($"{nameof(DeviceBatchUpgradeForApiAsync)} 设备批量升级时地址列表数量不能超过200", "-102");
}
var (deviceFirmwareVersionInfo, fileInfo) = await ValidateAndGetFirmwareInfoAsync(input.NowFirmwareVersionDataId, input.IoTPlatformProductId, "-103", "-104");
- var deviceInfoList = await deviceAppService.FindByDeviceAddressAsync(new FindByDeviceAddressInput() { AddressList = input.AddressList ,IoTPlatform = input.IoTPlatform,IoTPlatformProductId = input.IoTPlatformProductId});
+ var deviceInfoList = await deviceAppService.FindByDeviceAddressAsync(new FindByDeviceAddressInput() { AddressList = input.AddressList, IoTPlatform = input.IoTPlatform, IoTPlatformProductId = input.IoTPlatformProductId });
//如果是OneNET平台,提前获取产品信息和物模型信息,避免在循环中重复调用
dynamic oneNETProductInfo = null;
@@ -524,7 +524,7 @@ namespace JiShe.IoT.DeviceAggregation
foreach (var item in input.AddressList)
{
- var deviceInfo = deviceInfoList.Where(d=>d.DeviceAddress== item).FirstOrDefault();
+ var deviceInfo = deviceInfoList.Where(d => d.DeviceAddress == item).FirstOrDefault();
if (deviceInfo == null)
{
deviceAddress.Add(item);
@@ -568,15 +568,21 @@ namespace JiShe.IoT.DeviceAggregation
{
try
{
- // Lua 脚本:获取并删除键(原子操作)
+ // Lua 脚本:获取并删除键(原子操作)升级记录Id与文件ID关系缓存
string cacheKey = string.Format($"{RedisConst.CacheDeviceUpgradeRecordDataKey}", input.Id);
- var redisResult = await FreeRedisProvider.Instance.GetDelAsync(cacheKey);
+ var redisResult = await FreeRedisProvider.Instance.GetAsync(cacheKey);
if (string.IsNullOrWhiteSpace(redisResult))
{
throw new UserFriendlyException($"{nameof(DownloadFirmwareInfoAsync)} 设备升级记录信息 {input.Id} 不存在");
}
- var downLoadResult = await fileAppService.AllowDownloadAsync(input);
+ //将缓存中的文件ID转换成GUID
+ Guid fileId = new Guid(redisResult);
+
+ var downLoadResult = await fileAppService.AllowDownloadAsync(new IdInput()
+ {
+ Id = fileId,
+ });
if (downLoadResult.ContentLength.HasValue && downLoadResult.ContentLength.Value > 0)
{
//更新状态为文件下载中
@@ -1100,10 +1106,10 @@ namespace JiShe.IoT.DeviceAggregation
await ioTDBDataChannelManageService.DeviceTelemetryTaskWriterAsync(DataChannelManage.DeviceTelemetryTaskDataChannel.Writer, (DistributedMessageCenterConst.OneNETUpgradeCommandIssuedEventName, packetTaskInfo));
- //将升级记录数据Id存入Redis缓存中。
+ //将升级记录数据Id作为key,文件数据ID作为值,存入Redis缓存中。
string cacheKey = string.Format($"{RedisConst.CacheDeviceUpgradeRecordDataKey}", upgradeRecordInput.Id);
-
- await FreeRedisProvider.Instance.SetAsync(cacheKey, upgradeRecordInput.Id);
+
+ await FreeRedisProvider.Instance.SetAsync(cacheKey, fileObject.Id);
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, packetTaskInfo);
diff --git a/src/JiShe.IoT.Application/Jobs/CacheDeviceDataToRedisJob.cs b/src/JiShe.IoT.Application/Jobs/DeviceServerJob.cs
similarity index 56%
rename from src/JiShe.IoT.Application/Jobs/CacheDeviceDataToRedisJob.cs
rename to src/JiShe.IoT.Application/Jobs/DeviceServerJob.cs
index 469489a..28ddd23 100644
--- a/src/JiShe.IoT.Application/Jobs/CacheDeviceDataToRedisJob.cs
+++ b/src/JiShe.IoT.Application/Jobs/DeviceServerJob.cs
@@ -10,13 +10,13 @@ using System.Threading.Tasks;
namespace JiShe.IoT.Jobs
{
///
- /// 缓存设备数据到redis
+ /// 设备服务定时任务
///
- public class CacheDeviceDataToRedisJob : SystemBackGroundWorkService
+ public class DeviceServerJob : SystemBackGroundWorkService
{
- private readonly ILogger _logger;
+ private readonly ILogger _logger;
public readonly DeviceAppService _meterAppService;
- public CacheDeviceDataToRedisJob(ILogger logger, DeviceAppService meterAppService) : base(logger)
+ public DeviceServerJob(ILogger logger, DeviceAppService meterAppService) : base(logger)
{
_logger = logger;
_meterAppService = meterAppService;
@@ -27,14 +27,16 @@ namespace JiShe.IoT.Jobs
///
///
///
- protected override Task DoWorkAsync(CancellationToken cancellationToken)
+ protected override async Task DoWorkAsync(CancellationToken cancellationToken)
{
- return _meterAppService.CacheDeviceDataToRedisAsync();
+ //缓存数据到redis
+ await _meterAppService.CacheDeviceDataToRedisAsync();
+ await _meterAppService.CacheDeviceDataToRedisAsync();
}
protected override TimeSpan GetInterval()
{
- return TimeSpan.FromHours(1);
+ return TimeSpan.FromHours(1);
}
}
}
diff --git a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs
index 49b26c9..f8c790a 100644
--- a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs
+++ b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs
@@ -5,6 +5,8 @@ using JiShe.ServicePro.Dto;
using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.FreeSqlProvider;
using Mapster;
+using Microsoft.Extensions.Caching.Memory;
+using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp;
using Volo.Abp.Auditing;
@@ -18,17 +20,21 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
{
private readonly ILogger _logger;
- private readonly IReliableRedisPubSubService _redisPubSubService;
+ private readonly IReliableRedisPubSubService _reliableRedisPubSubService;
+ private readonly IServiceScopeFactory _serviceScopeFactory;
+
///
/// 设备状态统一订阅订阅服务
///
///
///
- public ServiceCommunicationChannelSubscriberService(ILogger logger, IReliableRedisPubSubService redisPubSubService)
+ ///
+ public ServiceCommunicationChannelSubscriberService(ILogger logger, IReliableRedisPubSubService redisPubSubService, IServiceScopeFactory serviceScopeFactory)
{
_logger = logger;
- _redisPubSubService = redisPubSubService;
+ _reliableRedisPubSubService = redisPubSubService;
+ _serviceScopeFactory = serviceScopeFactory;
}
///
@@ -39,15 +45,18 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
{
try
{
- // 为订阅回调创建独立的 FreeSql 客户端
- var callbackFreeSqlDbContext = FreeSqlDbContext;
- var callbackFreeSql = FreeRedisProvider;
-
-
// 订阅频道
- await _redisPubSubService.SubscribeReliableAsync(DistributedMessageCenterConst.ServiceCommunicationDeviceStatusEventName, async (message) =>
+ await _reliableRedisPubSubService.SubscribeReliableAsync(DistributedMessageCenterConst.ServiceCommunicationDeviceStatusEventName, async (message) =>
{
- await HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql);
+ using (var scope = _serviceScopeFactory.CreateScope())
+ {
+ var sp = scope.ServiceProvider;
+ var scopedServiceCommunicationChannelSubscriberService = sp.GetRequiredService();
+ var scopedLogger = sp.GetRequiredService>();
+ scopedLogger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)}收到设备{message.DeviceAddress}上下线状态更新通知,开始更新{message.Status}状态处理...");
+ await scopedServiceCommunicationChannelSubscriberService.DeviceStatusUpdate(message);
+ scopedLogger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)}设备{message.DeviceAddress}上下线状态更新通知,结束更新{message.Status}状态处理...");
+ }
return true;
});
}
@@ -61,46 +70,44 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
///
/// 设备状态处理
///
- ///
- ///
- ///
+ ///
///
///
- private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext, IFreeRedisProvider callbackFreeRedisProvider)
+ public async Task DeviceStatusUpdate(DeviceStatusMessage input)
{
try
{
- if (deviceStatusMessage == null || callbackFreeSqlDbContext == null)
+ if (input == null || input.DeviceAddress.IsNullOrWhiteSpace())
{
throw new UserFriendlyException($"设备状态消息处理失败,参数为空");
}
- var deviceEntity = await callbackFreeSqlDbContext.Instance.Select()
- .Where(d => d.IoTPlatform == deviceStatusMessage.IoTPlatform && d.DeviceAddress == deviceStatusMessage.DeviceAddress)
+ var deviceEntity = await FreeSqlDbContext.Instance.Select()
+ .Where(d => d.IoTPlatform == input.IoTPlatform && d.DeviceAddress == input.DeviceAddress)
.FirstAsync();
if (deviceEntity == null)
{
- throw new UserFriendlyException($"{deviceStatusMessage.DeviceAddress} 设备不存在");
+ throw new UserFriendlyException($"{input.DeviceAddress} 设备不存在");
}
- deviceEntity.DeviceOnlineStatus = deviceStatusMessage.Status;
- if (deviceStatusMessage.Status == DeviceOnlineStatus.Online)
+ deviceEntity.DeviceOnlineStatus = input.Status;
+ if (input.Status == DeviceOnlineStatus.Online)
{
- deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
+ deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(input.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
}
- else if (deviceStatusMessage.Status == DeviceOnlineStatus.Offline)
+ else if (input.Status == DeviceOnlineStatus.Offline)
{
- deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
+ deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(input.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
}
- var updateResult = await callbackFreeSqlDbContext.Instance.Update()
+ var updateResult = await FreeSqlDbContext.Instance.Update()
.SetSource(deviceEntity)
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
.ExecuteAffrowsAsync();
if (updateResult <= 0)
{
- _logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{deviceStatusMessage.Serialize()}");
+ _logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{input.Serialize()}");
throw new UserFriendlyException($"设备状态更新失败");
}
@@ -109,12 +116,12 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
DeviceCacheInfos deviceCacheInfos = deviceEntity.Adapt();
deviceCacheInfos.PlatformPassword = null;
- callbackFreeRedisProvider.Instance.HSet(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
+ FreeRedisProvider.Instance.HSet(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
}
- catch (Exception)
+ catch (Exception ex)
{
-
- throw;
+ _logger.LogError($"{nameof(DeviceStatusUpdate)} 设备在线状态更新发生异常:{ex.Message}");
+ throw ;
}
}
}