优化Redis订阅发布组件,统一管理分布式消息中心主题,新增实现高性能Task任务管理中心
This commit is contained in:
parent
f349e8a9bd
commit
2fcf9d854a
@ -5,7 +5,6 @@ using JiShe.ServicePro.Dto;
|
||||
using JiShe.ServicePro.Encrypt;
|
||||
using JiShe.ServicePro.Enums;
|
||||
using JiShe.ServicePro.FreeRedisProvider;
|
||||
using JiShe.ServicePro.Kafka.Consts;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace JiShe.IoT.BusinessSystemAggregation
|
||||
@ -65,11 +64,11 @@ namespace JiShe.IoT.BusinessSystemAggregation
|
||||
//将指令存储Kafka的OneNET主题中
|
||||
if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.OneNET)
|
||||
{
|
||||
await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, input);
|
||||
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, input);
|
||||
}
|
||||
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
|
||||
{
|
||||
await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName, input);
|
||||
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName, input);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
@ -7,9 +7,6 @@ using JiShe.ServicePro.DeviceManagement.Permissions;
|
||||
using JiShe.ServicePro.Dto;
|
||||
using JiShe.ServicePro.Enums;
|
||||
using JiShe.ServicePro.FreeRedisProvider;
|
||||
using JiShe.ServicePro.Kafka.Consts;
|
||||
using JiShe.ServicePro.Kafka.Producer;
|
||||
using JiShe.ServicePro.OneNET.Provider.OpenApiModels.Devices;
|
||||
using JiShe.ServicePro.OneNETManagement.OneNETDevices;
|
||||
using JiShe.ServicePro.OneNETManagement.OneNETProducts;
|
||||
using Mapster;
|
||||
@ -300,7 +297,7 @@ namespace JiShe.IoT.DeviceAggregation
|
||||
}
|
||||
else if (deviceInfo.IoTPlatform == IoTPlatformTypeEnum.CTWing)
|
||||
{
|
||||
await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.CTWingAepCommandIssuedEventName,commandRequest);
|
||||
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.CTWingAepCommandIssuedEventName,commandRequest);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
@ -599,7 +596,7 @@ namespace JiShe.IoT.DeviceAggregation
|
||||
throw new UserFriendlyException("设备不在线");
|
||||
}
|
||||
|
||||
await redisPubSubService.PublishReliableAsync(KafkaTopicConsts.OneNETCommandIssuedEventName, commandRequest);
|
||||
await redisPubSubService.PublishReliableAsync(DistributedMessageCenterConst.OneNETCommandIssuedEventName, commandRequest);
|
||||
return true;
|
||||
}
|
||||
catch (Exception)
|
||||
|
||||
@ -60,8 +60,8 @@ namespace JiShe.IoT
|
||||
var commonService = context.ServiceProvider.GetRequiredService<ICommonService>();
|
||||
commonService.InitSelectTypetList();
|
||||
|
||||
var issueSubscriberService = context.ServiceProvider.GetRequiredService<IOneNetIssueSubscriberService>();
|
||||
issueSubscriberService.IssueCommandRedisSubscriber();
|
||||
var serviceCommunicationChannelSubscriberService = context.ServiceProvider.GetRequiredService<IServiceCommunicationChannelSubscriberService>();
|
||||
serviceCommunicationChannelSubscriberService.ServiceCommunicationDeviceStatusSubscriber();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,21 +45,10 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
|
||||
|
||||
// 订阅频道
|
||||
await _redisPubSubService.SubscribeReliableAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, async (message) =>
|
||||
await _redisPubSubService.SubscribeReliableAsync<DeviceStatusMessage>(DistributedMessageCenterConst.ServiceCommunicationDeviceStatusEventName, async (message) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
//_logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}");
|
||||
|
||||
HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql).ConfigureAwait(false).GetAwaiter().GetResult();
|
||||
await HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql);
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}");
|
||||
await _redisPubSubService.PublishReliableAsync(RedisConst.ServiceCommunicationDeviceStatusDLQ, message);
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user