Redis发布订阅实现。
This commit is contained in:
parent
2b5a3a60e2
commit
aa5428d210
@ -1 +1 @@
|
|||||||
Subproject commit b61e2d1f76755f3763fcbc1256abd2d75c94f7a9
|
Subproject commit 798db6667ebd6a37c69877fe8657c268f267bfe3
|
||||||
@ -8,6 +8,7 @@ using JiShe.ServicePro.FreeRedisProvider;
|
|||||||
using JiShe.ServicePro.FreeSqlProvider;
|
using JiShe.ServicePro.FreeSqlProvider;
|
||||||
using JiShe.ServicePro.IoTDBManagement;
|
using JiShe.ServicePro.IoTDBManagement;
|
||||||
using JiShe.ServicePro.OneNETManagement;
|
using JiShe.ServicePro.OneNETManagement;
|
||||||
|
using JiShe.ServicePro.OneNETManagement.Subscribers;
|
||||||
using JiShe.ServicePro.TemplateManagement;
|
using JiShe.ServicePro.TemplateManagement;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Volo.Abp;
|
using Volo.Abp;
|
||||||
@ -58,6 +59,9 @@ namespace JiShe.IoT
|
|||||||
{
|
{
|
||||||
var commonService = context.ServiceProvider.GetRequiredService<ICommonService>();
|
var commonService = context.ServiceProvider.GetRequiredService<ICommonService>();
|
||||||
commonService.InitSelectTypetList();
|
commonService.InitSelectTypetList();
|
||||||
|
|
||||||
|
var communicationChannelService = context.ServiceProvider.GetRequiredService<IServiceCommunicationChannelSubscriberService>();
|
||||||
|
communicationChannelService.ServiceCommunicationDeviceStatusSubscriber();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,6 @@
|
|||||||
using JiShe.IoT;
|
using FreeRedis;
|
||||||
|
using FreeSql;
|
||||||
|
using JiShe.IoT;
|
||||||
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
|
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
|
||||||
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
|
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
|
||||||
using JiShe.ServicePro.Consts;
|
using JiShe.ServicePro.Consts;
|
||||||
@ -8,6 +10,7 @@ using JiShe.ServicePro.DeviceManagement.DeviceInfos;
|
|||||||
using JiShe.ServicePro.Dto;
|
using JiShe.ServicePro.Dto;
|
||||||
using JiShe.ServicePro.Enums;
|
using JiShe.ServicePro.Enums;
|
||||||
using JiShe.ServicePro.FreeRedisProvider;
|
using JiShe.ServicePro.FreeRedisProvider;
|
||||||
|
using JiShe.ServicePro.FreeSqlProvider;
|
||||||
using JiShe.ServicePro.IoTDBManagement.DataChannels;
|
using JiShe.ServicePro.IoTDBManagement.DataChannels;
|
||||||
using JiShe.ServicePro.IoTDBManagement.TableModels;
|
using JiShe.ServicePro.IoTDBManagement.TableModels;
|
||||||
using JiShe.ServicePro.Kafka.Attributes;
|
using JiShe.ServicePro.Kafka.Attributes;
|
||||||
@ -55,22 +58,40 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
DeviceStatusMessage deviceStatusMessage = null;
|
// 为订阅回调创建独立的 FreeSql 客户端
|
||||||
|
var callbackFreeSqlDbContext = FreeSqlDbContext;
|
||||||
// 订阅频道
|
// 订阅频道
|
||||||
await _redisPubSubService.SubscribeAsync(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
|
await _redisPubSubService.SubscribeAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
|
||||||
{
|
{
|
||||||
_logger.LogWarning($"Redis订阅收到消息: {message}");
|
_logger.LogWarning($"Redis订阅收到设备状态消息: {message}");
|
||||||
|
|
||||||
deviceStatusMessage = message.Deserialize<DeviceStatusMessage>();
|
HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult();
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
|
||||||
if (deviceStatusMessage == null)
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 设备状态处理
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deviceStatusMessage"></param>
|
||||||
|
/// <param name="callbackFreeSqlDbContext"></param>
|
||||||
|
/// <returns></returns>
|
||||||
|
/// <exception cref="UserFriendlyException"></exception>
|
||||||
|
private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (deviceStatusMessage == null || callbackFreeSqlDbContext == null)
|
||||||
{
|
{
|
||||||
throw new UserFriendlyException($"设备状态消息为空");
|
throw new UserFriendlyException($"设备状态消息处理失败,参数为空");
|
||||||
}
|
}
|
||||||
|
|
||||||
var deviceEntity = await FreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
|
var deviceEntity = await callbackFreeSqlDbContext.Instance.Select<DeviceManagementInfo>()
|
||||||
.Where(d => d.IoTPlatform == deviceStatusMessage.IoTPlatform && d.DeviceAddress == deviceStatusMessage.DeviceAddress)
|
.Where(d => d.IoTPlatform == deviceStatusMessage.IoTPlatform && d.DeviceAddress == deviceStatusMessage.DeviceAddress)
|
||||||
.FirstAsync();
|
.FirstAsync();
|
||||||
if (deviceEntity == null)
|
if (deviceEntity == null)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user