Compare commits

..

6 Commits

Author SHA1 Message Date
ChenYi
28fad6b79b 更新 2025-08-14 17:40:44 +08:00
ChenYi
b083cc5886 更新 2025-08-14 17:39:41 +08:00
ChenYi
9ed56fc005 Redis 发布订阅可靠版本 2025-08-14 17:39:26 +08:00
ChenYi
aa5428d210 Redis发布订阅实现。 2025-08-14 14:40:32 +08:00
ChenYi
2b5a3a60e2 完善Redis组件,支持发布订阅 2025-08-14 11:45:58 +08:00
ChenYi
79a42085dc 完善Pulsar鉴权 2025-08-14 10:21:49 +08:00
5 changed files with 75 additions and 36 deletions

@ -1 +1 @@
Subproject commit 3dbd0ebf5fff82ae03945631aa8e401620931ab1 Subproject commit 2cc68d62afbcde8e8cc7bfdc57c25ecf6fcbc54e

View File

@ -32,10 +32,11 @@
"TaskThreadCount": -1 "TaskThreadCount": -1
}, },
"Pulsar": { "Pulsar": {
"ServiceUrl": "pulsar://192.168.56.101:6650", "ServiceUrl": "pulsar://47.110.60.222:9093",
"WebUrl": "http://192.168.56.101:8080", "WebUrl": "http://47.110.60.222:9094",
"UserName": "admin",
"TenantName": "1YMVZZkAkRArjxSD8457", "TenantName": "1YMVZZkAkRArjxSD8457",
"Namespace": "default", "Namespace": "OneNET",
"PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79", "PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79",
"PulsarSubscriptionCustomName": "sub", "PulsarSubscriptionCustomName": "sub",
"EnableTls": false, "EnableTls": false,
@ -58,8 +59,10 @@
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
"Password": "Lixiao@1980", "Password": "Lixiao@1980",
"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], //"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], //"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"TreeModelClusterList": [ "47.110.53.196:30710" ],
"TableModelClusterList": [ "47.110.53.196:30710" ],
"PoolSize": 32, "PoolSize": 32,
"DataBaseName": "jisheiotdata", "DataBaseName": "jisheiotdata",
"OpenDebugMode": true, "OpenDebugMode": true,

View File

@ -1,7 +1,4 @@
using JiShe.ServicePro.Dto; namespace JiShe.ServicePro.OneNETManagement.Subscribers
using JiShe.ServicePro.Kafka.Internal;
namespace JiShe.ServicePro.OneNETManagement.Subscribers
{ {
/// <summary> /// <summary>
/// 数据通讯通道订阅者服务 /// 数据通讯通道订阅者服务
@ -9,10 +6,9 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
public interface IServiceCommunicationChannelSubscriberService public interface IServiceCommunicationChannelSubscriberService
{ {
/// <summary> /// <summary>
/// 设备上下线通知Kafka消息订阅 /// 设备上下线通知 Redis 消息订阅
/// </summary> /// </summary>
/// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message); Task ServiceCommunicationDeviceStatusSubscriber();
} }
} }

View File

@ -8,6 +8,7 @@ using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.FreeSqlProvider; using JiShe.ServicePro.FreeSqlProvider;
using JiShe.ServicePro.IoTDBManagement; using JiShe.ServicePro.IoTDBManagement;
using JiShe.ServicePro.OneNETManagement; using JiShe.ServicePro.OneNETManagement;
using JiShe.ServicePro.OneNETManagement.Subscribers;
using JiShe.ServicePro.TemplateManagement; using JiShe.ServicePro.TemplateManagement;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Volo.Abp; using Volo.Abp;
@ -57,7 +58,10 @@ namespace JiShe.IoT
public override void OnApplicationInitialization(ApplicationInitializationContext context) public override void OnApplicationInitialization(ApplicationInitializationContext context)
{ {
var commonService = context.ServiceProvider.GetRequiredService<ICommonService>(); var commonService = context.ServiceProvider.GetRequiredService<ICommonService>();
commonService.InitSelectTypetList(); commonService.InitSelectTypetList();
var communicationChannelService = context.ServiceProvider.GetRequiredService<IServiceCommunicationChannelSubscriberService>();
communicationChannelService.ServiceCommunicationDeviceStatusSubscriber();
} }
} }
} }

View File

@ -1,4 +1,6 @@
using JiShe.IoT; using FreeRedis;
using FreeSql;
using JiShe.IoT;
using JiShe.ServicePro.ApacheIoTDB.Provider.Model; using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
using JiShe.ServicePro.ApacheIoTDB.Provider.Options; using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Consts; using JiShe.ServicePro.Consts;
@ -8,6 +10,7 @@ using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.Dto; using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Enums; using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider; using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.FreeSqlProvider;
using JiShe.ServicePro.IoTDBManagement.DataChannels; using JiShe.ServicePro.IoTDBManagement.DataChannels;
using JiShe.ServicePro.IoTDBManagement.TableModels; using JiShe.ServicePro.IoTDBManagement.TableModels;
using JiShe.ServicePro.Kafka.Attributes; using JiShe.ServicePro.Kafka.Attributes;
@ -25,51 +28,85 @@ using Volo.Abp.Auditing;
namespace JiShe.ServicePro.OneNETManagement.Subscribers namespace JiShe.ServicePro.OneNETManagement.Subscribers
{ {
/// <summary> /// <summary>
/// 设备状态统一订阅订阅服务 /// 数据通讯通道订阅者服务
/// </summary> /// </summary>
[DisableAuditing] [DisableAuditing]
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
{ {
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger; private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
private readonly IoTDBOptions _iotDBOptions; private readonly IoTDBOptions _iotDBOptions;
private readonly IRedisPubSubService _redisPubSubService;
/// <summary> /// <summary>
/// 设备状态统一订阅订阅服务 /// 设备状态统一订阅订阅服务
/// </summary> /// </summary>
/// <param name="logger"></param> /// <param name="logger"></param>
/// <param name="iotDBOptions"></param> /// <param name="iotDBOptions"></param>
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions) /// <param name="redisPubSubService"></param>
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions, IRedisPubSubService redisPubSubService)
{ {
_logger = logger; _logger = logger;
_iotDBOptions = iotDBOptions.Value; _iotDBOptions = iotDBOptions.Value;
_redisPubSubService = redisPubSubService;
} }
/// <summary> /// <summary>
/// 设备上下线通知Kafka消息订阅 /// 设备上下线通知 Redis 消息订阅
/// </summary> /// </summary>
/// <param name="message"></param>
/// <returns></returns> /// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationDeviceStatusEventName)] public async Task ServiceCommunicationDeviceStatusSubscriber()
public async Task<ISubscribeAck> ServiceCommunicationDeviceStatusSubscriber(DeviceStatusMessage message)
{ {
try try
{ {
var deviceEntity = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>() // 为订阅回调创建独立的 FreeSql 客户端
.Where(d => d.IoTPlatform == message.IoTPlatform && d.DeviceAddress == message.DeviceAddress) var callbackFreeSqlDbContext = FreeSqlDbContext;
// 订阅频道
await _redisPubSubService.SubscribeAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
{
_logger.LogWarning($"Redis订阅收到设备状态消息: {message}");
HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult();
});
}
catch (Exception)
{
throw;
}
}
/// <summary>
/// 设备状态处理
/// </summary>
/// <param name="deviceStatusMessage"></param>
/// <param name="callbackFreeSqlDbContext"></param>
/// <returns></returns>
/// <exception cref="UserFriendlyException"></exception>
private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext)
{
try
{
if (deviceStatusMessage == null || callbackFreeSqlDbContext == null)
{
throw new UserFriendlyException($"设备状态消息处理失败,参数为空");
}
var deviceEntity = await callbackFreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
.Where(d => d.IoTPlatform == deviceStatusMessage.IoTPlatform && d.DeviceAddress == deviceStatusMessage.DeviceAddress)
.FirstAsync(); .FirstAsync();
if (deviceEntity == null) if (deviceEntity == null)
{ {
return SubscribeAck.Fail(); throw new UserFriendlyException($"{deviceStatusMessage.DeviceAddress} 设备不存在");
} }
deviceEntity.DeviceOnlineStatus = message.Status; deviceEntity.DeviceOnlineStatus = deviceStatusMessage.Status;
if (message.Status == DeviceOnlineStatusEnum.Online) if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Online)
{ {
deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds,DateTimeKind.Local); deviceEntity.LastOnlineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
} }
else if (message.Status == DeviceOnlineStatusEnum.Offline) else if (deviceStatusMessage.Status == DeviceOnlineStatusEnum.Offline)
{ {
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(message.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local); deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
} }
var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>() var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
@ -77,10 +114,11 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime }) .UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
.ExecuteAffrowsAsync(); .ExecuteAffrowsAsync();
if (updateResult <=0) if (updateResult <= 0)
{ {
_logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{message.Serialize()}"); _logger.LogError($"{nameof(ServiceCommunicationDeviceStatusSubscriber)} 设备状态更新失败{deviceStatusMessage.Serialize()}");
return SubscribeAck.Fail();
throw new UserFriendlyException($"设备状态更新失败");
} }
//更新设备数据缓存到Redis //更新设备数据缓存到Redis
@ -89,8 +127,6 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
deviceCacheInfos.PlatformPassword = null; deviceCacheInfos.PlatformPassword = null;
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos); RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
return SubscribeAck.Success();
} }
catch (Exception) catch (Exception)
{ {