Compare commits
6 Commits
b3d7e3d902
...
28fad6b79b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28fad6b79b | ||
|
|
b083cc5886 | ||
|
|
9ed56fc005 | ||
|
|
aa5428d210 | ||
|
|
2b5a3a60e2 | ||
|
|
79a42085dc |
@ -1 +1 @@
|
|||||||
Subproject commit 3dbd0ebf5fff82ae03945631aa8e401620931ab1
|
Subproject commit 2cc68d62afbcde8e8cc7bfdc57c25ecf6fcbc54e
|
||||||
@ -32,10 +32,11 @@
|
|||||||
"TaskThreadCount": -1
|
"TaskThreadCount": -1
|
||||||
},
|
},
|
||||||
"Pulsar": {
|
"Pulsar": {
|
||||||
"ServiceUrl": "pulsar://192.168.56.101:6650",
|
"ServiceUrl": "pulsar://47.110.60.222:9093",
|
||||||
"WebUrl": "http://192.168.56.101:8080",
|
"WebUrl": "http://47.110.60.222:9094",
|
||||||
|
"UserName": "admin",
|
||||||
"TenantName": "1YMVZZkAkRArjxSD8457",
|
"TenantName": "1YMVZZkAkRArjxSD8457",
|
||||||
"Namespace": "default",
|
"Namespace": "OneNET",
|
||||||
"PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79",
|
"PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79",
|
||||||
"PulsarSubscriptionCustomName": "sub",
|
"PulsarSubscriptionCustomName": "sub",
|
||||||
"EnableTls": false,
|
"EnableTls": false,
|
||||||
@ -58,8 +59,10 @@
|
|||||||
"IoTDBOptions": {
|
"IoTDBOptions": {
|
||||||
"UserName": "root",
|
"UserName": "root",
|
||||||
"Password": "Lixiao@1980",
|
"Password": "Lixiao@1980",
|
||||||
"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
|
//"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
|
||||||
"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
|
//"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
|
||||||
|
"TreeModelClusterList": [ "47.110.53.196:30710" ],
|
||||||
|
"TableModelClusterList": [ "47.110.53.196:30710" ],
|
||||||
"PoolSize": 32,
|
"PoolSize": 32,
|
||||||
"DataBaseName": "jisheiotdata",
|
"DataBaseName": "jisheiotdata",
|
||||||
"OpenDebugMode": true,
|
"OpenDebugMode": true,
|
||||||
|
|||||||
@ -1,7 +1,4 @@
|
|||||||
using JiShe.ServicePro.Dto;
|
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||||
using JiShe.ServicePro.Kafka.Internal;
|
|
||||||
|
|
||||||
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 数据通讯通道订阅者服务
|
/// 数据通讯通道订阅者服务
|
||||||
@ -9,10 +6,9 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
|||||||
public interface IServiceCommunicationChannelSubscriberService
|
public interface IServiceCommunicationChannelSubscriberService
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 设备上下线通知Kafka消息订阅
|
/// 设备上下线通知 Redis 消息订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message);
|
Task ServiceCommunicationDeviceStatusSubscriber();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,6 +8,7 @@ using JiShe.ServicePro.FreeRedisProvider;
|
|||||||
using JiShe.ServicePro.FreeSqlProvider;
|
using JiShe.ServicePro.FreeSqlProvider;
|
||||||
using JiShe.ServicePro.IoTDBManagement;
|
using JiShe.ServicePro.IoTDBManagement;
|
||||||
using JiShe.ServicePro.OneNETManagement;
|
using JiShe.ServicePro.OneNETManagement;
|
||||||
|
using JiShe.ServicePro.OneNETManagement.Subscribers;
|
||||||
using JiShe.ServicePro.TemplateManagement;
|
using JiShe.ServicePro.TemplateManagement;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Volo.Abp;
|
using Volo.Abp;
|
||||||
@ -57,7 +58,10 @@ namespace JiShe.IoT
|
|||||||
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
public override void OnApplicationInitialization(ApplicationInitializationContext context)
|
||||||
{
|
{
|
||||||
var commonService = context.ServiceProvider.GetRequiredService<ICommonService>();
|
var commonService = context.ServiceProvider.GetRequiredService<ICommonService>();
|
||||||
commonService.InitSelectTypetList();
|
commonService.InitSelectTypetList();
|
||||||
|
|
||||||
|
var communicationChannelService = context.ServiceProvider.GetRequiredService<IServiceCommunicationChannelSubscriberService>();
|
||||||
|
communicationChannelService.ServiceCommunicationDeviceStatusSubscriber();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,6 @@
|
|||||||
using JiShe.IoT;
|
using FreeRedis;
|
||||||
|
using FreeSql;
|
||||||
|
using JiShe.IoT;
|
||||||
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
|
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
|
||||||
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
|
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
|
||||||
using JiShe.ServicePro.Consts;
|
using JiShe.ServicePro.Consts;
|
||||||
@ -8,6 +10,7 @@ using JiShe.ServicePro.DeviceManagement.DeviceInfos;
|
|||||||
using JiShe.ServicePro.Dto;
|
using JiShe.ServicePro.Dto;
|
||||||
using JiShe.ServicePro.Enums;
|
using JiShe.ServicePro.Enums;
|
||||||
using JiShe.ServicePro.FreeRedisProvider;
|
using JiShe.ServicePro.FreeRedisProvider;
|
||||||
|
using JiShe.ServicePro.FreeSqlProvider;
|
||||||
using JiShe.ServicePro.IoTDBManagement.DataChannels;
|
using JiShe.ServicePro.IoTDBManagement.DataChannels;
|
||||||
using JiShe.ServicePro.IoTDBManagement.TableModels;
|
using JiShe.ServicePro.IoTDBManagement.TableModels;
|
||||||
using JiShe.ServicePro.Kafka.Attributes;
|
using JiShe.ServicePro.Kafka.Attributes;
|
||||||
@ -25,51 +28,85 @@ using Volo.Abp.Auditing;
|
|||||||
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 设备状态统一订阅订阅服务
|
/// 数据通讯通道订阅者服务
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[DisableAuditing]
|
[DisableAuditing]
|
||||||
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe
|
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
|
||||||
{
|
{
|
||||||
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
|
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
|
||||||
private readonly IoTDBOptions _iotDBOptions;
|
private readonly IoTDBOptions _iotDBOptions;
|
||||||
|
private readonly IRedisPubSubService _redisPubSubService;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 设备状态统一订阅订阅服务
|
/// 设备状态统一订阅订阅服务
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="logger"></param>
|
/// <param name="logger"></param>
|
||||||
/// <param name="iotDBOptions"></param>
|
/// <param name="iotDBOptions"></param>
|
||||||
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions)
|
/// <param name="redisPubSubService"></param>
|
||||||
|
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions, IRedisPubSubService redisPubSubService)
|
||||||
{
|
{
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_iotDBOptions = iotDBOptions.Value;
|
_iotDBOptions = iotDBOptions.Value;
|
||||||
|
_redisPubSubService = redisPubSubService;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 设备上下线通知Kafka消息订阅
|
/// 设备上下线通知 Redis 消息订阅
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="message"></param>
|
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
[KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationDeviceStatusEventName)]
|
public async Task ServiceCommunicationDeviceStatusSubscriber()
|
||||||
public async Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message)
|
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var deviceEntity = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
|
// 为订阅回调创建独立的 FreeSql 客户端
|
||||||
.Where(d => d.IoTPlatform == message.IoTPlatform && d.DeviceAddress == message.DeviceAddress)
|
var callbackFreeSqlDbContext = FreeSqlDbContext;
|
||||||
|
// 订阅频道
|
||||||
|
await _redisPubSubService.SubscribeAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
|
||||||
|
{
|
||||||
|
_logger.LogWarning($"Redis订阅收到设备状态消息: {message}");
|
||||||
|
|
||||||
|
HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 设备状态处理
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deviceStatusMessage"></param>
|
||||||
|
/// <param name="callbackFreeSqlDbContext"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
/// <exception cref="UserFriendlyException"></exception>
|
||||||
|
private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (deviceStatusMessage == null || callbackFreeSqlDbContext == null)
|
||||||
|
{
|
||||||
|
throw new UserFriendlyException($"设备状态消息处理失败,参数为空");
|
||||||
|
}
|
||||||
|
|
||||||
|
var deviceEntity = await callbackFreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
|
||||||
|
.Where(d => d.IoTPlatform == deviceStatusMessage.IoTPlatform && d.DeviceAddress == deviceStatusMessage.DeviceAddress)
|
||||||
.FirstAsync();
|
.FirstAsync();
|
||||||
if (deviceEntity == null)
|
if (deviceEntity == null)
|
||||||
{
|
{
|
||||||
return SubscribeAck.Fail();
|
throw new UserFriendlyException($"{deviceStatusMessage.DeviceAddress} 设备不存在");
|
||||||
}
|
}
|
||||||
|
|
||||||
deviceEntity.DeviceOnlineStatus = message.Status;
|
deviceEntity.DeviceOnlineStatus = deviceStatusMessage.Status;
|
||||||
if (message.Status == DeviceOnlineStatusEnum.Online)
|
if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Online)
|
||||||
{
|
{
|
||||||
deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds,DateTimeKind.Local);
|
deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||||
}
|
}
|
||||||
else if (message.Status == DeviceOnlineStatusEnum.Offline)
|
else if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Offline)
|
||||||
{
|
{
|
||||||
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||||
}
|
}
|
||||||
|
|
||||||
var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
|
var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
|
||||||
@ -77,10 +114,11 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
|||||||
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
|
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
|
||||||
.ExecuteAffrowsAsync();
|
.ExecuteAffrowsAsync();
|
||||||
|
|
||||||
if (updateResult <=0)
|
if (updateResult <= 0)
|
||||||
{
|
{
|
||||||
_logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{message.Serialize()}");
|
_logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{deviceStatusMessage.Serialize()}");
|
||||||
return SubscribeAck.Fail();
|
|
||||||
|
throw new UserFriendlyException($"设备状态更新失败");
|
||||||
}
|
}
|
||||||
|
|
||||||
//更新设备数据缓存到Redis
|
//更新设备数据缓存到Redis
|
||||||
@ -89,8 +127,6 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
|||||||
deviceCacheInfos.PlatformPassword = null;
|
deviceCacheInfos.PlatformPassword = null;
|
||||||
|
|
||||||
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
|
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
|
||||||
|
|
||||||
return SubscribeAck.Success();
|
|
||||||
}
|
}
|
||||||
catch (Exception)
|
catch (Exception)
|
||||||
{
|
{
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user