完善IoTDB表,完善redis发布订阅

This commit is contained in:
ChenYi 2025-08-15 10:43:14 +08:00
parent f2874d162f
commit 0d2371d881
3 changed files with 58 additions and 32 deletions

View File

@ -46,7 +46,6 @@
"KeepAliveInterval": 30,
"TaskThreadCount": 1,
"IsSubscriber": true,
"AdminToken": "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJhZG1pbiJ9.BzPvlu1lulFJvKofOZSw0FNtGW_edXS4rU32e6lfTX6s4VOh3I-noPRkSdfdF5xE6zR3X_kz4NPuz4Vb5KVLfQ",
"DefaultPartitions": 16,
"DefaultBundles": 16,
"EnableAutoCreation": true, //Topic

View File

@ -1,27 +1,11 @@
using FreeRedis;
using FreeSql;
using JiShe.IoT;
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Consts;
using JiShe.IoT;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.DataChannelManages;
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.FreeSqlProvider;
using JiShe.ServicePro.IoTDBManagement.DataChannels;
using JiShe.ServicePro.IoTDBManagement.TableModels;
using JiShe.ServicePro.Kafka.Attributes;
using JiShe.ServicePro.Kafka.Consts;
using JiShe.ServicePro.Kafka.Internal;
using JiShe.ServicePro.OneNET.Provider.ReceiveModels;
using JiShe.ServicePro.OneNET.Provider.ReceiveModels.ThingModeData;
using JiShe.ServicePro.ServerOptions;
using Mapster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp;
using Volo.Abp.Auditing;
@ -34,19 +18,16 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
{
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
private readonly IoTDBOptions _iotDBOptions;
private readonly IRedisPubSubService _redisPubSubService;
private readonly IReliableRedisPubSubService _redisPubSubService;
/// <summary>
/// 设备状态统一订阅订阅服务
/// </summary>
/// <param name="logger"></param>
/// <param name="iotDBOptions"></param>
/// <param name="redisPubSubService"></param>
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions, IRedisPubSubService redisPubSubService)
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IReliableRedisPubSubService redisPubSubService)
{
_logger = logger;
_iotDBOptions = iotDBOptions.Value;
_redisPubSubService = redisPubSubService;
}
@ -61,11 +42,21 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
// 为订阅回调创建独立的 FreeSql 客户端
var callbackFreeSqlDbContext = FreeSqlDbContext;
// 订阅频道
await _redisPubSubService.SubscribeAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
await _redisPubSubService.SubscribeReliableAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, async (message) =>
{
_logger.LogWarning($"Redis订阅收到设备状态消息: {message}");
try
{
_logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}");
HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult();
HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult();
return true;
}
catch (Exception ex)
{
_logger.LogWarning($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}");
await _redisPubSubService.PublishReliableAsync(RedisConst.ServiceCommunicationDeviceStatusDLQ, message);
return false;
}
});
}
catch (Exception)

View File

@ -5,10 +5,10 @@
"IoTDBOptions": {
"UserName": "root",
"Password": "Lixiao@1980",
"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
//"TreeModelClusterList": [ "192.168.111.37:6667" ],
//"TableModelClusterList": [ "192.168.111.37:6667" ],
//"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
//"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"TreeModelClusterList": [ "47.110.53.196:30710" ],
"TableModelClusterList": [ "47.110.53.196:30710" ],
"PoolSize": 32,
"DataBaseName": "jisheiotdata",
"OpenDebugMode": true,
@ -17,5 +17,41 @@
"FreeRedisOptions": {
"ConnectionString": "47.110.53.196:6379,password=1q3J@BGf!yhTaD46nS#,abortConnect=false,connectTimeout=30000,allowAdmin=true,maxPoolSize=500,defaultdatabase=14",
"UseDistributedCache": true
},
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus10",
"FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00",
"AutomaticTerminalVersionTime": "17:07:00",
"AutomaticTelematicsModuleTime": "17:30:00",
"AutomaticDayFreezeTime": "02:30:00",
"AutomaticMonthFreezeTime": "03:30:00",
"DefaultProtocolPlugin": "T37612012ProtocolPlugin",
"VerifySignatureToken": "SIcPQnpMgaFDmNlIjNmzq5smshz7cKrh",
"DistributedMessage": 2
},
"Pulsar": {
"ServiceUrl": "pulsar://47.110.60.222:9093",
"WebUrl": "http://47.110.60.222:9094",
"UserName": "admin",
"TenantName": "1YMVZZkAkRArjxSD8457",
"Namespace": "OneNET",
"PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79",
"PulsarSubscriptionCustomName": "sub",
"EnableTls": false,
"ValidateServerCertificate": false,
"ConnectionTimeout": 30,
"OperationTimeout": 30,
"KeepAliveInterval": 30,
"TaskThreadCount": 1,
"IsSubscriber": true,
"DefaultPartitions": 16,
"DefaultBundles": 16,
"EnableAutoCreation": true, //Topic
"TopicMode": "Static", //Dynamic
"EnableTopicTypeFilter": true, //Topic
"AllowedTopicTypes": [ "Static" ], //Topic
"AllowedClusters": [ "pulsar-cluster-1" ], //
"AdminRoles": [ "admin" ]
}
}