Compare commits

..

No commits in common. "f0d271bf97baa3255ec8ae89cbe62bbd51abf7d6" and "f2874d162f22d8b16500b32d96ed2fe7864edad3" have entirely different histories.

4 changed files with 36 additions and 66 deletions

@ -1 +1 @@
Subproject commit 2ed918f8dc2c40f679726fff1d71aea6603362ae Subproject commit 3ef9c78974a2d08b8fd73042951d3481d3d32291

View File

@ -46,6 +46,7 @@
"KeepAliveInterval": 30, "KeepAliveInterval": 30,
"TaskThreadCount": 1, "TaskThreadCount": 1,
"IsSubscriber": true, "IsSubscriber": true,
"AdminToken": "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJhZG1pbiJ9.BzPvlu1lulFJvKofOZSw0FNtGW_edXS4rU32e6lfTX6s4VOh3I-noPRkSdfdF5xE6zR3X_kz4NPuz4Vb5KVLfQ",
"DefaultPartitions": 16, "DefaultPartitions": 16,
"DefaultBundles": 16, "DefaultBundles": 16,
"EnableAutoCreation": true, //Topic "EnableAutoCreation": true, //Topic

View File

@ -1,11 +1,27 @@
using JiShe.IoT; using FreeRedis;
using FreeSql;
using JiShe.IoT;
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Consts;
using JiShe.ServicePro.Core; using JiShe.ServicePro.Core;
using JiShe.ServicePro.DataChannelManages;
using JiShe.ServicePro.DeviceManagement.DeviceInfos; using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.Dto; using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider; using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.FreeSqlProvider; using JiShe.ServicePro.FreeSqlProvider;
using JiShe.ServicePro.IoTDBManagement.DataChannels;
using JiShe.ServicePro.IoTDBManagement.TableModels;
using JiShe.ServicePro.Kafka.Attributes;
using JiShe.ServicePro.Kafka.Consts;
using JiShe.ServicePro.Kafka.Internal;
using JiShe.ServicePro.OneNET.Provider.ReceiveModels;
using JiShe.ServicePro.OneNET.Provider.ReceiveModels.ThingModeData;
using JiShe.ServicePro.ServerOptions;
using Mapster; using Mapster;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp; using Volo.Abp;
using Volo.Abp.Auditing; using Volo.Abp.Auditing;
@ -18,16 +34,19 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
{ {
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger; private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
private readonly IReliableRedisPubSubService _redisPubSubService; private readonly IoTDBOptions _iotDBOptions;
private readonly IRedisPubSubService _redisPubSubService;
/// <summary> /// <summary>
/// 设备状态统一订阅订阅服务 /// 设备状态统一订阅订阅服务
/// </summary> /// </summary>
/// <param name="logger"></param> /// <param name="logger"></param>
/// <param name="iotDBOptions"></param>
/// <param name="redisPubSubService"></param> /// <param name="redisPubSubService"></param>
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IReliableRedisPubSubService redisPubSubService) public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions, IRedisPubSubService redisPubSubService)
{ {
_logger = logger; _logger = logger;
_iotDBOptions = iotDBOptions.Value;
_redisPubSubService = redisPubSubService; _redisPubSubService = redisPubSubService;
} }
@ -41,25 +60,12 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
{ {
// 为订阅回调创建独立的 FreeSql 客户端 // 为订阅回调创建独立的 FreeSql 客户端
var callbackFreeSqlDbContext = FreeSqlDbContext; var callbackFreeSqlDbContext = FreeSqlDbContext;
var callbackFreeSql = RedisProvider;
// 订阅频道 // 订阅频道
await _redisPubSubService.SubscribeReliableAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, async (message) => await _redisPubSubService.SubscribeAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
{ {
try _logger.LogWarning($"Redis订阅收到设备状态消息: {message}");
{
_logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}");
HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql).ConfigureAwait(false).GetAwaiter().GetResult(); HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult();
return true;
}
catch (Exception ex)
{
_logger.LogWarning($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}");
await _redisPubSubService.PublishReliableAsync(RedisConst.ServiceCommunicationDeviceStatusDLQ, message);
return false;
}
}); });
} }
catch (Exception) catch (Exception)
@ -74,10 +80,9 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
/// </summary> /// </summary>
/// <param name="deviceStatusMessage"></param> /// <param name="deviceStatusMessage"></param>
/// <param name="callbackFreeSqlDbContext"></param> /// <param name="callbackFreeSqlDbContext"></param>
/// <param name="callbackFreeRedisProvider"></param>
/// <returns></returns> /// <returns></returns>
/// <exception cref="UserFriendlyException"></exception> /// <exception cref="UserFriendlyException"></exception>
private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext, IFreeRedisProvider callbackFreeRedisProvider) private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext)
{ {
try try
{ {
@ -104,7 +109,7 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local); deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
} }
var updateResult = await callbackFreeSqlDbContext.Instance.Update<DeviceManagementInfo>() var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
.SetSource(deviceEntity) .SetSource(deviceEntity)
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime }) .UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
.ExecuteAffrowsAsync(); .ExecuteAffrowsAsync();
@ -121,7 +126,7 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
deviceCacheInfos.IoTPlatformResponse = null; deviceCacheInfos.IoTPlatformResponse = null;
deviceCacheInfos.PlatformPassword = null; deviceCacheInfos.PlatformPassword = null;
callbackFreeRedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos); RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
} }
catch (Exception) catch (Exception)
{ {

View File

@ -5,10 +5,10 @@
"IoTDBOptions": { "IoTDBOptions": {
"UserName": "root", "UserName": "root",
"Password": "Lixiao@1980", "Password": "Lixiao@1980",
//"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], "TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
//"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ], "TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"TreeModelClusterList": [ "47.110.53.196:30710" ], //"TreeModelClusterList": [ "192.168.111.37:6667" ],
"TableModelClusterList": [ "47.110.53.196:30710" ], //"TableModelClusterList": [ "192.168.111.37:6667" ],
"PoolSize": 32, "PoolSize": 32,
"DataBaseName": "jisheiotdata", "DataBaseName": "jisheiotdata",
"OpenDebugMode": true, "OpenDebugMode": true,
@ -17,41 +17,5 @@
"FreeRedisOptions": { "FreeRedisOptions": {
"ConnectionString": "47.110.53.196:6379,password=1q3J@BGf!yhTaD46nS#,abortConnect=false,connectTimeout=30000,allowAdmin=true,maxPoolSize=500,defaultdatabase=14", "ConnectionString": "47.110.53.196:6379,password=1q3J@BGf!yhTaD46nS#,abortConnect=false,connectTimeout=30000,allowAdmin=true,maxPoolSize=500,defaultdatabase=14",
"UseDistributedCache": true "UseDistributedCache": true
},
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus10",
"FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00",
"AutomaticTerminalVersionTime": "17:07:00",
"AutomaticTelematicsModuleTime": "17:30:00",
"AutomaticDayFreezeTime": "02:30:00",
"AutomaticMonthFreezeTime": "03:30:00",
"DefaultProtocolPlugin": "T37612012ProtocolPlugin",
"VerifySignatureToken": "SIcPQnpMgaFDmNlIjNmzq5smshz7cKrh",
"DistributedMessage": 2
},
"Pulsar": {
"ServiceUrl": "pulsar://47.110.60.222:9093",
"WebUrl": "http://47.110.60.222:9094",
"UserName": "admin",
"TenantName": "1YMVZZkAkRArjxSD8457",
"Namespace": "OneNET",
"PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79",
"PulsarSubscriptionCustomName": "sub",
"EnableTls": false,
"ValidateServerCertificate": false,
"ConnectionTimeout": 30,
"OperationTimeout": 30,
"KeepAliveInterval": 30,
"TaskThreadCount": 1,
"IsSubscriber": true,
"DefaultPartitions": 16,
"DefaultBundles": 16,
"EnableAutoCreation": true, //Topic
"TopicMode": "Static", //Dynamic
"EnableTopicTypeFilter": true, //Topic
"AllowedTopicTypes": [ "Static" ], //Topic
"AllowedClusters": [ "pulsar-cluster-1" ], //
"AdminRoles": [ "admin" ]
} }
} }