diff --git a/host/JiShe.IoT.HttpApi.Host/appsettings.Development.json b/host/JiShe.IoT.HttpApi.Host/appsettings.Development.json index 8d78f85..dbf1416 100644 --- a/host/JiShe.IoT.HttpApi.Host/appsettings.Development.json +++ b/host/JiShe.IoT.HttpApi.Host/appsettings.Development.json @@ -45,8 +45,7 @@ "OperationTimeout": 30, "KeepAliveInterval": 30, "TaskThreadCount": 1, - "IsSubscriber": true, - "AdminToken": "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJhZG1pbiJ9.BzPvlu1lulFJvKofOZSw0FNtGW_edXS4rU32e6lfTX6s4VOh3I-noPRkSdfdF5xE6zR3X_kz4NPuz4Vb5KVLfQ", + "IsSubscriber": true, "DefaultPartitions": 16, "DefaultBundles": 16, "EnableAutoCreation": true, //开启自动创建Topic diff --git a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs index a82e3ed..b436cb6 100644 --- a/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs +++ b/src/JiShe.IoT.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs @@ -1,27 +1,11 @@ -using FreeRedis; -using FreeSql; -using JiShe.IoT; -using JiShe.ServicePro.ApacheIoTDB.Provider.Model; -using JiShe.ServicePro.ApacheIoTDB.Provider.Options; -using JiShe.ServicePro.Consts; +using JiShe.IoT; using JiShe.ServicePro.Core; -using JiShe.ServicePro.DataChannelManages; using JiShe.ServicePro.DeviceManagement.DeviceInfos; using JiShe.ServicePro.Dto; -using JiShe.ServicePro.Enums; using JiShe.ServicePro.FreeRedisProvider; using JiShe.ServicePro.FreeSqlProvider; -using JiShe.ServicePro.IoTDBManagement.DataChannels; -using JiShe.ServicePro.IoTDBManagement.TableModels; -using JiShe.ServicePro.Kafka.Attributes; -using JiShe.ServicePro.Kafka.Consts; -using JiShe.ServicePro.Kafka.Internal; -using JiShe.ServicePro.OneNET.Provider.ReceiveModels; -using JiShe.ServicePro.OneNET.Provider.ReceiveModels.ThingModeData; -using JiShe.ServicePro.ServerOptions; using Mapster; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; using Volo.Abp; using Volo.Abp.Auditing; @@ -34,19 +18,16 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService { private readonly ILogger _logger; - private readonly IoTDBOptions _iotDBOptions; - private readonly IRedisPubSubService _redisPubSubService; + private readonly IReliableRedisPubSubService _redisPubSubService; /// /// 设备状态统一订阅订阅服务 /// /// - /// /// - public ServiceCommunicationChannelSubscriberService(ILogger logger, IOptions iotDBOptions, IRedisPubSubService redisPubSubService) + public ServiceCommunicationChannelSubscriberService(ILogger logger, IReliableRedisPubSubService redisPubSubService) { _logger = logger; - _iotDBOptions = iotDBOptions.Value; _redisPubSubService = redisPubSubService; } @@ -61,11 +42,21 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers // 为订阅回调创建独立的 FreeSql 客户端 var callbackFreeSqlDbContext = FreeSqlDbContext; // 订阅频道 - await _redisPubSubService.SubscribeAsync(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) => + await _redisPubSubService.SubscribeReliableAsync(RedisConst.ServiceCommunicationDeviceStatusEventName, async (message) => { - _logger.LogWarning($"Redis订阅收到设备状态消息: {message}"); - - HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult(); + try + { + _logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}"); + + HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult(); + return true; + } + catch (Exception ex) + { + _logger.LogWarning($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}"); + await _redisPubSubService.PublishReliableAsync(RedisConst.ServiceCommunicationDeviceStatusDLQ, message); + return false; + } }); } catch (Exception) diff --git a/src/JiShe.IoT.DbMigrator/appsettings.json b/src/JiShe.IoT.DbMigrator/appsettings.json index dc7a490..31f534c 100644 --- a/src/JiShe.IoT.DbMigrator/appsettings.json +++ b/src/JiShe.IoT.DbMigrator/appsettings.json @@ -5,10 +5,10 @@ "IoTDBOptions": { "UserName": "root", "Password": "Lixiao@1980", - "TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], - "TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], - //"TreeModelClusterList": [ "192.168.111.37:6667" ], - //"TableModelClusterList": [ "192.168.111.37:6667" ], + //"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], + //"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], + "TreeModelClusterList": [ "47.110.53.196:30710" ], + "TableModelClusterList": [ "47.110.53.196:30710" ], "PoolSize": 32, "DataBaseName": "jisheiotdata", "OpenDebugMode": true, @@ -17,5 +17,41 @@ "FreeRedisOptions": { "ConnectionString": "47.110.53.196:6379,password=1q3J@BGf!yhTaD46nS#,abortConnect=false,connectTimeout=30000,allowAdmin=true,maxPoolSize=500,defaultdatabase=14", "UseDistributedCache": true + }, + "ServerApplicationOptions": { + "ServerTagName": "JiSheCollectBus10", + "FirstCollectionTime": "2025-04-28 15:07:00", + "AutomaticVerificationTime": "16:07:00", + "AutomaticTerminalVersionTime": "17:07:00", + "AutomaticTelematicsModuleTime": "17:30:00", + "AutomaticDayFreezeTime": "02:30:00", + "AutomaticMonthFreezeTime": "03:30:00", + "DefaultProtocolPlugin": "T37612012ProtocolPlugin", + "VerifySignatureToken": "SIcPQnpMgaFDmNlIjNmzq5smshz7cKrh", + "DistributedMessage": 2 + }, + "Pulsar": { + "ServiceUrl": "pulsar://47.110.60.222:9093", + "WebUrl": "http://47.110.60.222:9094", + "UserName": "admin", + "TenantName": "1YMVZZkAkRArjxSD8457", + "Namespace": "OneNET", + "PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79", + "PulsarSubscriptionCustomName": "sub", + "EnableTls": false, + "ValidateServerCertificate": false, + "ConnectionTimeout": 30, + "OperationTimeout": 30, + "KeepAliveInterval": 30, + "TaskThreadCount": 1, + "IsSubscriber": true, + "DefaultPartitions": 16, + "DefaultBundles": 16, + "EnableAutoCreation": true, //开启自动创建Topic + "TopicMode": "Static", //Dynamic 主题模式 + "EnableTopicTypeFilter": true, //允许Topic类型过滤 + "AllowedTopicTypes": [ "Static" ], //允许的Topic类型 + "AllowedClusters": [ "pulsar-cluster-1" ], //允许的集群 + "AdminRoles": [ "admin" ] } } \ No newline at end of file