Compare commits

..

3 Commits

Author SHA1 Message Date
ChenYi
d0ce361ac8 更新 2025-07-31 15:51:57 +08:00
ChenYi
5e60436026 优化IoTDB时间戳展示,实现设备在线状态以及时间及时更新。 2025-07-31 15:51:46 +08:00
ChenYi
8d325a57d8 完善设备管理设计,展示对应的所属账号手机号 2025-07-31 10:43:36 +08:00
9 changed files with 285 additions and 45 deletions

1
.gitignore vendored
View File

@ -56,3 +56,4 @@
/test/JiShe.IoT.HttpApi.Client.ConsoleTestApp/bin/Release/net9.0
/test/JiShe.IoT.TestBase/bin/Release/net9.0
/host/JiShe.IoT.HttpApi.Host/UploadFile/20250729/host/abp-file-management
/host/JiShe.IoT.HttpApi.Host/UploadFile/20250731/host/abp-file-management/3a1b7059-f930-8a33-5a5b-a67f48f2feec

@ -1 +1 @@
Subproject commit d88da5e29442a906e596baa24e262f9c4a6576fb
Subproject commit 9fe622501ad8a33ad13e3280c303d806923eeb40

View File

@ -1,5 +1,3 @@
using Serilog.Events;
namespace JiShe.IoT;
public class Program

View File

@ -0,0 +1,18 @@
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Kafka.Internal;
namespace JiShe.ServicePro.OneNETManagement.Subscribers
{
/// <summary>
/// 数据通讯通道订阅者服务
/// </summary>
public interface IServiceCommunicationChannelSubscriberService
{
/// <summary>
/// 设备上下线通知Kafka消息订阅
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message);
}
}

View File

@ -12,6 +12,7 @@ using Mapster;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@ -88,6 +89,8 @@ namespace JiShe.IoT.DeviceAggregation
createDeviceInput.IoTPlatformAccountId = productInfo.OneNETAccountId;
createDeviceInput.IoTPlatformDeviceOpenInfo = $"{input.IoTPlatformProductId}{input.DeviceAddress}";
createDeviceInput.PlatformPassword = productInfo.ProductAccesskey;
createDeviceInput.IoTPlatformProductName = productInfo.ProductName;
createDeviceInput.AccountPhoneNumber = productInfo.AccountPhoneNumber;
var insertResult = await deviceAppService.CreateAsync(createDeviceInput);
if (insertResult == null)
@ -111,22 +114,7 @@ namespace JiShe.IoT.DeviceAggregation
return false;
}
UpdateDeviceInput updateDeviceInput = insertResult.Adapt<UpdateDeviceInput>();
updateDeviceInput.IoTPlatformResponse = pushResult.Serialize();
updateDeviceInput.IsPlatformPushSuccess = true;
var updateResult = await deviceAppService.UpdateAsync(updateDeviceInput);
if (updateResult == null)
{
logger.LogError($"{nameof(CreateAsync)} 更新设备信息失败:{input.Serialize()}");
return false;
}
//设备数据缓存到Redis
DeviceCacheInfos deviceCacheInfos = insertResult.Adapt<DeviceCacheInfos>();
deviceCacheInfos.IoTPlatformResponse = updateDeviceInput.IoTPlatformResponse;
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, insertResult.DeviceAddress, deviceCacheInfos);
await DeviceUpdateHandler(insertResult, pushResult);
return true;
}
@ -144,7 +132,7 @@ namespace JiShe.IoT.DeviceAggregation
/// <returns></returns>
public async Task<bool> CTWingDeviceCreateAsync(CreateDeviceAggregationInput input)
{
throw new UserFriendlyException($"CTWing 设备创建失败,功能未实现。");
throw new UserFriendlyException($"CTWing 设备创建失败,功能未实现。");
}
/// <summary>
@ -181,14 +169,62 @@ namespace JiShe.IoT.DeviceAggregation
{
try
{
var entityDevice = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>().Where(f => f.Id == input.Id).FirstAsync();
var entityDevice = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>().Where(f => f.Id == input.Id).FirstAsync<DeviceManagementInfoDto>();
if (entityDevice == null)
{
throw new UserFriendlyException($"推送失败,未找到设备数据");
}
if (entityDevice.IoTPlatform == ServicePro.Enums.IoTPlatformTypeEnum.CTWing)
{
return await RepushDeviceInfoToCTWing(entityDevice);
}
else if (entityDevice.IoTPlatform == ServicePro.Enums.IoTPlatformTypeEnum.OneNET)
{
return await RepushDeviceInfoToOneNET(entityDevice);
}
throw new UserFriendlyException($"推送失败,异常的物联网平台");
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 重新推送设备信息到CTWing物联网平台
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="UserFriendlyException"></exception>
public async Task<DeviceManagementInfoDto> RepushDeviceInfoToCTWing(DeviceManagementInfoDto input)
{
try
{
throw new UserFriendlyException($"推送失败CTWing暂未实现。");
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 重新推送设备信息到OneNET物联网平台
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
/// <exception cref="UserFriendlyException"></exception>
public async Task<DeviceManagementInfoDto> RepushDeviceInfoToOneNET(DeviceManagementInfoDto input)
{
try
{
var productInfo = await FreeSqlDbContext.Instance.Select<OneNETProductInfos>()
.Where(e => e.IsEnabled == true && e.IoTPlatformProductId == entityDevice.IoTPlatformProductId)
.Where(e => e.IsEnabled == true && e.IoTPlatformProductId == input.IoTPlatformProductId)
.FirstAsync();
if (productInfo == null)
@ -200,10 +236,10 @@ namespace JiShe.IoT.DeviceAggregation
//推送至OneNET平台
var pushResult = await oneNETDeviceService.CreateDeviceInfoAsync(new CreateDeviceInfoInput()
{
DeviceName = entityDevice.IoTPlatformDeviceOpenInfo,
DeviceName = input.IoTPlatformDeviceOpenInfo,
ProductId = productInfo.IoTPlatformProductId,
OneNETAccountId = productInfo.OneNETAccountId,
Description = entityDevice.DeviceAddress,
Description = input.DeviceAddress,
});
if (pushResult == null || pushResult.Code != ServicePro.Enums.ResponeResultEnum.Success)
@ -213,22 +249,7 @@ namespace JiShe.IoT.DeviceAggregation
}
UpdateDeviceInput updateDeviceInput = entityDevice.Adapt<UpdateDeviceInput>();
updateDeviceInput.IoTPlatformResponse = pushResult.Serialize();
var updateResult = await deviceAppService.UpdateAsync(updateDeviceInput);
if (updateResult == null)
{
logger.LogError($"{nameof(CreateAsync)} 更新设备信息失败:{input.Serialize()}");
throw new UserFriendlyException($"推送结果更新失败。");
}
//设备数据缓存到Redis
DeviceCacheInfos deviceCacheInfos = entityDevice.Adapt<DeviceCacheInfos>();
deviceCacheInfos.IoTPlatformResponse = updateDeviceInput.IoTPlatformResponse;
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, entityDevice.DeviceAddress, deviceCacheInfos);
return entityDevice.Adapt<DeviceManagementInfoDto>();
return await DeviceUpdateHandler(input, pushResult);
}
catch (Exception)
{
@ -236,5 +257,35 @@ namespace JiShe.IoT.DeviceAggregation
throw;
}
}
/// <summary>
/// 更新设备信息并处理缓存
/// </summary>
/// <param name="input"></param>
/// <param name="pushResult"></param>
/// <returns></returns>
/// <exception cref="UserFriendlyException"></exception>
private async Task<DeviceManagementInfoDto> DeviceUpdateHandler(DeviceManagementInfoDto input, HttpDataResult pushResult)
{
UpdateDeviceInput updateDeviceInput = input.Adapt<UpdateDeviceInput>();
updateDeviceInput.IoTPlatformResponse = pushResult.Serialize();
updateDeviceInput.IsPlatformPushSuccess = true;
var updateResult = await deviceAppService.UpdateAsync(updateDeviceInput);
if (updateResult == null)
{
logger.LogError($"{nameof(CreateAsync)} 更新设备信息失败:{input.Serialize()}");
throw new UserFriendlyException($"推送结果更新失败。");
}
//设备数据缓存到Redis
DeviceCacheInfos deviceCacheInfos = input.Adapt<DeviceCacheInfos>();
deviceCacheInfos.IoTPlatformResponse = null;
deviceCacheInfos.PlatformPassword = null;
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, input.DeviceAddress, deviceCacheInfos);
return input.Adapt<DeviceManagementInfoDto>();
}
}
}

View File

@ -0,0 +1,95 @@
using JiShe.IoT;
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Consts;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.DataChannelManages;
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.IoTDBManagement.DataChannels;
using JiShe.ServicePro.IoTDBManagement.TableModels;
using JiShe.ServicePro.Kafka.Attributes;
using JiShe.ServicePro.Kafka.Consts;
using JiShe.ServicePro.Kafka.Internal;
using JiShe.ServicePro.OneNET.Provider.ReceiveModels;
using JiShe.ServicePro.OneNET.Provider.ReceiveModels.ThingModeData;
using JiShe.ServicePro.ServerOptions;
using Mapster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp;
using Volo.Abp.Auditing;
namespace JiShe.ServicePro.OneNETManagement.Subscribers
{
/// <summary>
/// 设备状态统一订阅订阅服务
/// </summary>
[DisableAuditing]
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe
{
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
private readonly IoTDBOptions _iotDBOptions;
/// <summary>
/// 设备状态统一订阅订阅服务
/// </summary>
/// <param name="logger"></param>
/// <param name="iotDBOptions"></param>
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions)
{
_logger = logger;
_iotDBOptions = iotDBOptions.Value;
}
/// <summary>
/// 设备上下线通知Kafka消息订阅
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationDeviceStatusEventName)]
public async Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message)
{
try
{
var deviceEntity = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
.Where(d => d.IoTPlatform == message.IoTPlatform && d.DeviceAddress == message.DeviceAddress)
.FirstAsync();
if (deviceEntity == null)
{
return SubscribeAck.Fail();
}
deviceEntity.DeviceOnlineStatus = message.Status;
if (message.Status == DeviceOnlineStatusEnum.Online)
{
deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds,DateTimeKind.Local);
}
else if (message.Status == DeviceOnlineStatusEnum.Offline)
{
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
}
var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
.SetSource(deviceEntity)
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
.ExecuteAffrowsAsync();
if (updateResult <=0)
{
_logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{message.Serialize()}");
return SubscribeAck.Fail();
}
return SubscribeAck.Success();
}
catch (Exception)
{
throw;
}
}
}
}

View File

@ -13,7 +13,7 @@ using Volo.Abp.EntityFrameworkCore;
namespace JiShe.IoT.Migrations
{
[DbContext(typeof(IoTDbContext))]
[Migration("20250730091642_InitialCreate")]
[Migration("20250731021938_InitialCreate")]
partial class InitialCreate
{
/// <inheritdoc />
@ -250,6 +250,9 @@ namespace JiShe.IoT.Migrations
b.Property<int>("AccessType")
.HasColumnType("int");
b.Property<string>("AccountPhoneNumber")
.HasColumnType("longtext");
b.Property<int>("AuthType")
.HasColumnType("int");
@ -551,6 +554,12 @@ namespace JiShe.IoT.Migrations
b.Property<Guid>("Id")
.HasColumnType("char(36)");
b.Property<string>("AccountPhoneNumber")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("varchar(50)")
.HasComment("账户手机号");
b.Property<string>("ConcurrencyStamp")
.IsConcurrencyToken()
.IsRequired()
@ -586,6 +595,10 @@ namespace JiShe.IoT.Migrations
.HasColumnType("varchar(128)")
.HasComment("设备名称");
b.Property<int?>("DeviceOnlineStatus")
.HasColumnType("int")
.HasComment("设备在线状态");
b.Property<string>("ExtraProperties")
.HasColumnType("longtext")
.HasColumnName("ExtraProperties")
@ -613,6 +626,12 @@ namespace JiShe.IoT.Migrations
.HasColumnType("varchar(50)")
.HasComment("物联网平台中对应的产品Id");
b.Property<string>("IoTPlatformProductName")
.IsRequired()
.HasMaxLength(128)
.HasColumnType("varchar(128)")
.HasComment("物联网平台中对应的产品Name");
b.Property<string>("IoTPlatformResponse")
.HasColumnType("text")
.HasComment("物联网平台返回的响应信息");
@ -624,7 +643,8 @@ namespace JiShe.IoT.Migrations
.HasColumnName("IsDeleted");
b.Property<bool>("IsPlatformPushSuccess")
.HasColumnType("tinyint(1)");
.HasColumnType("tinyint(1)")
.HasComment("物联网平台推送是否成功");
b.Property<DateTime?>("LastModificationTime")
.HasColumnType("datetime(6)")
@ -634,6 +654,14 @@ namespace JiShe.IoT.Migrations
.HasColumnType("char(36)")
.HasColumnName("LastModifierId");
b.Property<DateTime?>("LastOfflineTime")
.HasColumnType("datetime(6)")
.HasComment("最后离线时间");
b.Property<DateTime?>("LastOnlineTime")
.HasColumnType("datetime(6)")
.HasComment("最后在线时间");
b.Property<int?>("OSACreatorId")
.HasColumnType("int")
.HasComment("旧系统授权创建者Id");
@ -1347,6 +1375,11 @@ namespace JiShe.IoT.Migrations
.HasColumnType("int")
.HasComment("接入协议");
b.Property<string>("AccountPhoneNumber")
.HasMaxLength(50)
.HasColumnType("varchar(50)")
.HasComment("账户手机号");
b.Property<string>("Brand")
.HasColumnType("longtext")
.HasComment("产品品牌");

View File

@ -796,6 +796,8 @@ namespace JiShe.IoT.Migrations
Id = table.Column<Guid>(type: "char(36)", nullable: false, collation: "ascii_general_ci"),
CTWingAccountId = table.Column<string>(type: "varchar(50)", maxLength: 50, nullable: false, comment: "账号ID")
.Annotation("MySql:CharSet", "utf8mb4"),
AccountPhoneNumber = table.Column<string>(type: "longtext", nullable: true)
.Annotation("MySql:CharSet", "utf8mb4"),
CommunicationAddress = table.Column<string>(type: "varchar(128)", maxLength: 128, nullable: false, comment: "通讯服务地址")
.Annotation("MySql:CharSet", "utf8mb4"),
MasterKey = table.Column<string>(type: "varchar(256)", maxLength: 256, nullable: true, comment: "OpenAPI 通信主密钥")
@ -872,11 +874,18 @@ namespace JiShe.IoT.Migrations
.Annotation("MySql:CharSet", "utf8mb4"),
IoTPlatformProductId = table.Column<string>(type: "varchar(50)", maxLength: 50, nullable: false, comment: "物联网平台中对应的产品Id")
.Annotation("MySql:CharSet", "utf8mb4"),
IoTPlatformProductName = table.Column<string>(type: "varchar(128)", maxLength: 128, nullable: false, comment: "物联网平台中对应的产品Name")
.Annotation("MySql:CharSet", "utf8mb4"),
IoTPlatformAccountId = table.Column<string>(type: "varchar(50)", maxLength: 50, nullable: false, comment: "物联网平台中对应的账号Id")
.Annotation("MySql:CharSet", "utf8mb4"),
AccountPhoneNumber = table.Column<string>(type: "varchar(50)", maxLength: 50, nullable: false, comment: "账户手机号")
.Annotation("MySql:CharSet", "utf8mb4"),
IoTPlatformResponse = table.Column<string>(type: "text", nullable: true, comment: "物联网平台返回的响应信息")
.Annotation("MySql:CharSet", "utf8mb4"),
IsPlatformPushSuccess = table.Column<bool>(type: "tinyint(1)", nullable: false),
IsPlatformPushSuccess = table.Column<bool>(type: "tinyint(1)", nullable: false, comment: "物联网平台推送是否成功"),
DeviceOnlineStatus = table.Column<int>(type: "int", nullable: true, comment: "设备在线状态"),
LastOnlineTime = table.Column<DateTime>(type: "datetime(6)", nullable: true, comment: "最后在线时间"),
LastOfflineTime = table.Column<DateTime>(type: "datetime(6)", nullable: true, comment: "最后离线时间"),
ConcurrencyStamp = table.Column<string>(type: "varchar(40)", maxLength: 40, nullable: false)
.Annotation("MySql:CharSet", "utf8mb4"),
CreationTime = table.Column<DateTime>(type: "datetime(6)", nullable: false),
@ -996,6 +1005,8 @@ namespace JiShe.IoT.Migrations
Id = table.Column<Guid>(type: "char(36)", nullable: false, collation: "ascii_general_ci"),
OneNETAccountId = table.Column<string>(type: "varchar(50)", maxLength: 50, nullable: false, comment: "账户Id")
.Annotation("MySql:CharSet", "utf8mb4"),
AccountPhoneNumber = table.Column<string>(type: "varchar(50)", maxLength: 50, nullable: true, comment: "账户手机号")
.Annotation("MySql:CharSet", "utf8mb4"),
IoTPlatformProductId = table.Column<string>(type: "varchar(50)", maxLength: 50, nullable: false, comment: "物联网平台对应的产品Id")
.Annotation("MySql:CharSet", "utf8mb4"),
ProductName = table.Column<string>(type: "varchar(128)", maxLength: 128, nullable: true, comment: "产品名称")

View File

@ -247,6 +247,9 @@ namespace JiShe.IoT.Migrations
b.Property<int>("AccessType")
.HasColumnType("int");
b.Property<string>("AccountPhoneNumber")
.HasColumnType("longtext");
b.Property<int>("AuthType")
.HasColumnType("int");
@ -548,6 +551,12 @@ namespace JiShe.IoT.Migrations
b.Property<Guid>("Id")
.HasColumnType("char(36)");
b.Property<string>("AccountPhoneNumber")
.IsRequired()
.HasMaxLength(50)
.HasColumnType("varchar(50)")
.HasComment("账户手机号");
b.Property<string>("ConcurrencyStamp")
.IsConcurrencyToken()
.IsRequired()
@ -583,6 +592,10 @@ namespace JiShe.IoT.Migrations
.HasColumnType("varchar(128)")
.HasComment("设备名称");
b.Property<int?>("DeviceOnlineStatus")
.HasColumnType("int")
.HasComment("设备在线状态");
b.Property<string>("ExtraProperties")
.HasColumnType("longtext")
.HasColumnName("ExtraProperties")
@ -610,6 +623,12 @@ namespace JiShe.IoT.Migrations
.HasColumnType("varchar(50)")
.HasComment("物联网平台中对应的产品Id");
b.Property<string>("IoTPlatformProductName")
.IsRequired()
.HasMaxLength(128)
.HasColumnType("varchar(128)")
.HasComment("物联网平台中对应的产品Name");
b.Property<string>("IoTPlatformResponse")
.HasColumnType("text")
.HasComment("物联网平台返回的响应信息");
@ -621,7 +640,8 @@ namespace JiShe.IoT.Migrations
.HasColumnName("IsDeleted");
b.Property<bool>("IsPlatformPushSuccess")
.HasColumnType("tinyint(1)");
.HasColumnType("tinyint(1)")
.HasComment("物联网平台推送是否成功");
b.Property<DateTime?>("LastModificationTime")
.HasColumnType("datetime(6)")
@ -631,6 +651,14 @@ namespace JiShe.IoT.Migrations
.HasColumnType("char(36)")
.HasColumnName("LastModifierId");
b.Property<DateTime?>("LastOfflineTime")
.HasColumnType("datetime(6)")
.HasComment("最后离线时间");
b.Property<DateTime?>("LastOnlineTime")
.HasColumnType("datetime(6)")
.HasComment("最后在线时间");
b.Property<int?>("OSACreatorId")
.HasColumnType("int")
.HasComment("旧系统授权创建者Id");
@ -1344,6 +1372,11 @@ namespace JiShe.IoT.Migrations
.HasColumnType("int")
.HasComment("接入协议");
b.Property<string>("AccountPhoneNumber")
.HasMaxLength(50)
.HasColumnType("varchar(50)")
.HasComment("账户手机号");
b.Property<string>("Brand")
.HasColumnType("longtext")
.HasComment("产品品牌");