Compare commits
6 Commits
f2874d162f
...
f0d271bf97
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0d271bf97 | ||
|
|
916fc31d77 | ||
|
|
9c3387130c | ||
|
|
cada042190 | ||
|
|
12e407f523 | ||
|
|
0d2371d881 |
@ -1 +1 @@
|
||||
Subproject commit 3ef9c78974a2d08b8fd73042951d3481d3d32291
|
||||
Subproject commit 2ed918f8dc2c40f679726fff1d71aea6603362ae
|
||||
@ -45,8 +45,7 @@
|
||||
"OperationTimeout": 30,
|
||||
"KeepAliveInterval": 30,
|
||||
"TaskThreadCount": 1,
|
||||
"IsSubscriber": true,
|
||||
"AdminToken": "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJhZG1pbiJ9.BzPvlu1lulFJvKofOZSw0FNtGW_edXS4rU32e6lfTX6s4VOh3I-noPRkSdfdF5xE6zR3X_kz4NPuz4Vb5KVLfQ",
|
||||
"IsSubscriber": true,
|
||||
"DefaultPartitions": 16,
|
||||
"DefaultBundles": 16,
|
||||
"EnableAutoCreation": true, //开启自动创建Topic
|
||||
|
||||
@ -1,27 +1,11 @@
|
||||
using FreeRedis;
|
||||
using FreeSql;
|
||||
using JiShe.IoT;
|
||||
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
|
||||
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
|
||||
using JiShe.ServicePro.Consts;
|
||||
using JiShe.IoT;
|
||||
using JiShe.ServicePro.Core;
|
||||
using JiShe.ServicePro.DataChannelManages;
|
||||
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
|
||||
using JiShe.ServicePro.Dto;
|
||||
using JiShe.ServicePro.Enums;
|
||||
using JiShe.ServicePro.FreeRedisProvider;
|
||||
using JiShe.ServicePro.FreeSqlProvider;
|
||||
using JiShe.ServicePro.IoTDBManagement.DataChannels;
|
||||
using JiShe.ServicePro.IoTDBManagement.TableModels;
|
||||
using JiShe.ServicePro.Kafka.Attributes;
|
||||
using JiShe.ServicePro.Kafka.Consts;
|
||||
using JiShe.ServicePro.Kafka.Internal;
|
||||
using JiShe.ServicePro.OneNET.Provider.ReceiveModels;
|
||||
using JiShe.ServicePro.OneNET.Provider.ReceiveModels.ThingModeData;
|
||||
using JiShe.ServicePro.ServerOptions;
|
||||
using Mapster;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using Volo.Abp;
|
||||
using Volo.Abp.Auditing;
|
||||
|
||||
@ -34,19 +18,16 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
|
||||
{
|
||||
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
|
||||
private readonly IoTDBOptions _iotDBOptions;
|
||||
private readonly IRedisPubSubService _redisPubSubService;
|
||||
private readonly IReliableRedisPubSubService _redisPubSubService;
|
||||
|
||||
/// <summary>
|
||||
/// 设备状态统一订阅订阅服务
|
||||
/// </summary>
|
||||
/// <param name="logger"></param>
|
||||
/// <param name="iotDBOptions"></param>
|
||||
/// <param name="redisPubSubService"></param>
|
||||
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions, IRedisPubSubService redisPubSubService)
|
||||
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IReliableRedisPubSubService redisPubSubService)
|
||||
{
|
||||
_logger = logger;
|
||||
_iotDBOptions = iotDBOptions.Value;
|
||||
_redisPubSubService = redisPubSubService;
|
||||
}
|
||||
|
||||
@ -60,12 +41,25 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
{
|
||||
// 为订阅回调创建独立的 FreeSql 客户端
|
||||
var callbackFreeSqlDbContext = FreeSqlDbContext;
|
||||
var callbackFreeSql = RedisProvider;
|
||||
|
||||
|
||||
// 订阅频道
|
||||
await _redisPubSubService.SubscribeAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
|
||||
await _redisPubSubService.SubscribeReliableAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, async (message) =>
|
||||
{
|
||||
_logger.LogWarning($"Redis订阅收到设备状态消息: {message}");
|
||||
|
||||
HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult();
|
||||
try
|
||||
{
|
||||
_logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}");
|
||||
|
||||
HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql).ConfigureAwait(false).GetAwaiter().GetResult();
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}");
|
||||
await _redisPubSubService.PublishReliableAsync(RedisConst.ServiceCommunicationDeviceStatusDLQ, message);
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception)
|
||||
@ -80,9 +74,10 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
/// </summary>
|
||||
/// <param name="deviceStatusMessage"></param>
|
||||
/// <param name="callbackFreeSqlDbContext"></param>
|
||||
/// <param name="callbackFreeRedisProvider"></param>
|
||||
/// <returns></returns>
|
||||
/// <exception cref="UserFriendlyException"></exception>
|
||||
private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext)
|
||||
private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext, IFreeRedisProvider callbackFreeRedisProvider)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -109,7 +104,7 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
|
||||
}
|
||||
|
||||
var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
|
||||
var updateResult = await callbackFreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
|
||||
.SetSource(deviceEntity)
|
||||
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
|
||||
.ExecuteAffrowsAsync();
|
||||
@ -126,7 +121,7 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
|
||||
deviceCacheInfos.IoTPlatformResponse = null;
|
||||
deviceCacheInfos.PlatformPassword = null;
|
||||
|
||||
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
|
||||
callbackFreeRedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
|
||||
@ -5,10 +5,10 @@
|
||||
"IoTDBOptions": {
|
||||
"UserName": "root",
|
||||
"Password": "Lixiao@1980",
|
||||
"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
|
||||
"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
|
||||
//"TreeModelClusterList": [ "192.168.111.37:6667" ],
|
||||
//"TableModelClusterList": [ "192.168.111.37:6667" ],
|
||||
//"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
|
||||
//"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
|
||||
"TreeModelClusterList": [ "47.110.53.196:30710" ],
|
||||
"TableModelClusterList": [ "47.110.53.196:30710" ],
|
||||
"PoolSize": 32,
|
||||
"DataBaseName": "jisheiotdata",
|
||||
"OpenDebugMode": true,
|
||||
@ -17,5 +17,41 @@
|
||||
"FreeRedisOptions": {
|
||||
"ConnectionString": "47.110.53.196:6379,password=1q3J@BGf!yhTaD46nS#,abortConnect=false,connectTimeout=30000,allowAdmin=true,maxPoolSize=500,defaultdatabase=14",
|
||||
"UseDistributedCache": true
|
||||
},
|
||||
"ServerApplicationOptions": {
|
||||
"ServerTagName": "JiSheCollectBus10",
|
||||
"FirstCollectionTime": "2025-04-28 15:07:00",
|
||||
"AutomaticVerificationTime": "16:07:00",
|
||||
"AutomaticTerminalVersionTime": "17:07:00",
|
||||
"AutomaticTelematicsModuleTime": "17:30:00",
|
||||
"AutomaticDayFreezeTime": "02:30:00",
|
||||
"AutomaticMonthFreezeTime": "03:30:00",
|
||||
"DefaultProtocolPlugin": "T37612012ProtocolPlugin",
|
||||
"VerifySignatureToken": "SIcPQnpMgaFDmNlIjNmzq5smshz7cKrh",
|
||||
"DistributedMessage": 2
|
||||
},
|
||||
"Pulsar": {
|
||||
"ServiceUrl": "pulsar://47.110.60.222:9093",
|
||||
"WebUrl": "http://47.110.60.222:9094",
|
||||
"UserName": "admin",
|
||||
"TenantName": "1YMVZZkAkRArjxSD8457",
|
||||
"Namespace": "OneNET",
|
||||
"PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79",
|
||||
"PulsarSubscriptionCustomName": "sub",
|
||||
"EnableTls": false,
|
||||
"ValidateServerCertificate": false,
|
||||
"ConnectionTimeout": 30,
|
||||
"OperationTimeout": 30,
|
||||
"KeepAliveInterval": 30,
|
||||
"TaskThreadCount": 1,
|
||||
"IsSubscriber": true,
|
||||
"DefaultPartitions": 16,
|
||||
"DefaultBundles": 16,
|
||||
"EnableAutoCreation": true, //开启自动创建Topic
|
||||
"TopicMode": "Static", //Dynamic 主题模式
|
||||
"EnableTopicTypeFilter": true, //允许Topic类型过滤
|
||||
"AllowedTopicTypes": [ "Static" ], //允许的Topic类型
|
||||
"AllowedClusters": [ "pulsar-cluster-1" ], //允许的集群
|
||||
"AdminRoles": [ "admin" ]
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user