Compare commits

..

No commits in common. "f0d271bf97baa3255ec8ae89cbe62bbd51abf7d6" and "f2874d162f22d8b16500b32d96ed2fe7864edad3" have entirely different histories.

4 changed files with 36 additions and 66 deletions

@ -1 +1 @@
Subproject commit 2ed918f8dc2c40f679726fff1d71aea6603362ae
Subproject commit 3ef9c78974a2d08b8fd73042951d3481d3d32291

View File

@ -45,7 +45,8 @@
"OperationTimeout": 30,
"KeepAliveInterval": 30,
"TaskThreadCount": 1,
"IsSubscriber": true,
"IsSubscriber": true,
"AdminToken": "eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJhZG1pbiJ9.BzPvlu1lulFJvKofOZSw0FNtGW_edXS4rU32e6lfTX6s4VOh3I-noPRkSdfdF5xE6zR3X_kz4NPuz4Vb5KVLfQ",
"DefaultPartitions": 16,
"DefaultBundles": 16,
"EnableAutoCreation": true, //Topic

View File

@ -1,11 +1,27 @@
using JiShe.IoT;
using FreeRedis;
using FreeSql;
using JiShe.IoT;
using JiShe.ServicePro.ApacheIoTDB.Provider.Model;
using JiShe.ServicePro.ApacheIoTDB.Provider.Options;
using JiShe.ServicePro.Consts;
using JiShe.ServicePro.Core;
using JiShe.ServicePro.DataChannelManages;
using JiShe.ServicePro.DeviceManagement.DeviceInfos;
using JiShe.ServicePro.Dto;
using JiShe.ServicePro.Enums;
using JiShe.ServicePro.FreeRedisProvider;
using JiShe.ServicePro.FreeSqlProvider;
using JiShe.ServicePro.IoTDBManagement.DataChannels;
using JiShe.ServicePro.IoTDBManagement.TableModels;
using JiShe.ServicePro.Kafka.Attributes;
using JiShe.ServicePro.Kafka.Consts;
using JiShe.ServicePro.Kafka.Internal;
using JiShe.ServicePro.OneNET.Provider.ReceiveModels;
using JiShe.ServicePro.OneNET.Provider.ReceiveModels.ThingModeData;
using JiShe.ServicePro.ServerOptions;
using Mapster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp;
using Volo.Abp.Auditing;
@ -18,16 +34,19 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
public class ServiceCommunicationChannelSubscriberService : IoTAppService, IServiceCommunicationChannelSubscriberService
{
private readonly ILogger<ServiceCommunicationChannelSubscriberService> _logger;
private readonly IReliableRedisPubSubService _redisPubSubService;
private readonly IoTDBOptions _iotDBOptions;
private readonly IRedisPubSubService _redisPubSubService;
/// <summary>
/// 设备状态统一订阅订阅服务
/// </summary>
/// <param name="logger"></param>
/// <param name="iotDBOptions"></param>
/// <param name="redisPubSubService"></param>
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IReliableRedisPubSubService redisPubSubService)
public ServiceCommunicationChannelSubscriberService(ILogger<ServiceCommunicationChannelSubscriberService> logger, IOptions<IoTDBOptions> iotDBOptions, IRedisPubSubService redisPubSubService)
{
_logger = logger;
_iotDBOptions = iotDBOptions.Value;
_redisPubSubService = redisPubSubService;
}
@ -41,25 +60,12 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
{
// 为订阅回调创建独立的 FreeSql 客户端
var callbackFreeSqlDbContext = FreeSqlDbContext;
var callbackFreeSql = RedisProvider;
// 订阅频道
await _redisPubSubService.SubscribeReliableAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, async (message) =>
await _redisPubSubService.SubscribeAsync<DeviceStatusMessage>(RedisConst.ServiceCommunicationDeviceStatusEventName, (message) =>
{
try
{
_logger.LogWarning($"Redis订阅收到设备状态消息: {message.Serialize()}");
HandDeviceStatus(message, callbackFreeSqlDbContext, callbackFreeSql).ConfigureAwait(false).GetAwaiter().GetResult();
return true;
}
catch (Exception ex)
{
_logger.LogWarning($"处理Redis订阅设备状态消息发生异常:{ex.Message},数据写入死信队列,消息体: {message.Serialize()}");
await _redisPubSubService.PublishReliableAsync(RedisConst.ServiceCommunicationDeviceStatusDLQ, message);
return false;
}
_logger.LogWarning($"Redis订阅收到设备状态消息: {message}");
HandDeviceStatus(message, callbackFreeSqlDbContext).ConfigureAwait(false).GetAwaiter().GetResult();
});
}
catch (Exception)
@ -74,10 +80,9 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
/// </summary>
/// <param name="deviceStatusMessage"></param>
/// <param name="callbackFreeSqlDbContext"></param>
/// <param name="callbackFreeRedisProvider"></param>
/// <returns></returns>
/// <exception cref="UserFriendlyException"></exception>
private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext, IFreeRedisProvider callbackFreeRedisProvider)
private async Task HandDeviceStatus(DeviceStatusMessage deviceStatusMessage, IFreeSqlProvider callbackFreeSqlDbContext)
{
try
{
@ -104,7 +109,7 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
deviceEntity.LastOfflineTime = TimestampHelper.ConvertToDateTime(deviceStatusMessage.ReceivedTime, TimestampUnit.Milliseconds, DateTimeKind.Local);
}
var updateResult = await callbackFreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
var updateResult = await FreeSqlDbContext.Instance.Update<DeviceManagementInfo>()
.SetSource(deviceEntity)
.UpdateColumns(a => new { a.DeviceOnlineStatus, a.LastOnlineTime, a.LastOfflineTime })
.ExecuteAffrowsAsync();
@ -121,7 +126,7 @@ namespace JiShe.ServicePro.OneNETManagement.Subscribers
deviceCacheInfos.IoTPlatformResponse = null;
deviceCacheInfos.PlatformPassword = null;
callbackFreeRedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
RedisProvider.Instance.HSet<DeviceCacheInfos>(RedisConst.CacheAllDeviceInfoHashKey, deviceEntity.DeviceAddress, deviceCacheInfos);
}
catch (Exception)
{

View File

@ -5,10 +5,10 @@
"IoTDBOptions": {
"UserName": "root",
"Password": "Lixiao@1980",
//"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
//"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"TreeModelClusterList": [ "47.110.53.196:30710" ],
"TableModelClusterList": [ "47.110.53.196:30710" ],
"TreeModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
"TableModelClusterList": [ "47.110.53.196:6667", "47.110.60.222:6667", "47.110.62.104:6667" ],
//"TreeModelClusterList": [ "192.168.111.37:6667" ],
//"TableModelClusterList": [ "192.168.111.37:6667" ],
"PoolSize": 32,
"DataBaseName": "jisheiotdata",
"OpenDebugMode": true,
@ -17,41 +17,5 @@
"FreeRedisOptions": {
"ConnectionString": "47.110.53.196:6379,password=1q3J@BGf!yhTaD46nS#,abortConnect=false,connectTimeout=30000,allowAdmin=true,maxPoolSize=500,defaultdatabase=14",
"UseDistributedCache": true
},
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus10",
"FirstCollectionTime": "2025-04-28 15:07:00",
"AutomaticVerificationTime": "16:07:00",
"AutomaticTerminalVersionTime": "17:07:00",
"AutomaticTelematicsModuleTime": "17:30:00",
"AutomaticDayFreezeTime": "02:30:00",
"AutomaticMonthFreezeTime": "03:30:00",
"DefaultProtocolPlugin": "T37612012ProtocolPlugin",
"VerifySignatureToken": "SIcPQnpMgaFDmNlIjNmzq5smshz7cKrh",
"DistributedMessage": 2
},
"Pulsar": {
"ServiceUrl": "pulsar://47.110.60.222:9093",
"WebUrl": "http://47.110.60.222:9094",
"UserName": "admin",
"TenantName": "1YMVZZkAkRArjxSD8457",
"Namespace": "OneNET",
"PulsarSecretKey": "0fd7afb8b0d04e6abc4fdfdac2190a79",
"PulsarSubscriptionCustomName": "sub",
"EnableTls": false,
"ValidateServerCertificate": false,
"ConnectionTimeout": 30,
"OperationTimeout": 30,
"KeepAliveInterval": 30,
"TaskThreadCount": 1,
"IsSubscriber": true,
"DefaultPartitions": 16,
"DefaultBundles": 16,
"EnableAutoCreation": true, //Topic
"TopicMode": "Static", //Dynamic
"EnableTopicTypeFilter": true, //Topic
"AllowedTopicTypes": [ "Static" ], //Topic
"AllowedClusters": [ "pulsar-cluster-1" ], //
"AdminRoles": [ "admin" ]
}
}