添加测试下发

This commit is contained in:
zenghongyao 2025-05-08 11:25:08 +08:00
parent a5ed0a37a0
commit 245c91c9e3
5 changed files with 71 additions and 12 deletions

View File

@ -14,3 +14,12 @@ public class SampleDto2
{ {
public int Value { get; set; } public int Value { get; set; }
} }
public class KafkaSendDto
{
public string Address { get; set; }
public string Frame { get; set; }
}

View File

@ -100,16 +100,17 @@ namespace JiShe.CollectBus.Plugins
{ {
//todo: 删除Redis缓存 //todo: 删除Redis缓存
var tcpSessionClient = (ITcpSessionClient)client; var tcpSessionClient = (ITcpSessionClient)client;
var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id); //var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id);
if (entity != null) //if (entity != null)
{ //{
entity.UpdateByOnClosed(); // entity.UpdateByOnClosed();
await _deviceRepository.UpdateAsync(entity); // await _deviceRepository.UpdateAsync(entity);
} //}
else //else
{ //{
_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败"); //_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败");
} //}
_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接");
await e.InvokeNext(); await e.InvokeNext();
} }

View File

@ -14,6 +14,7 @@ using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.PrepayModel; using JiShe.CollectBus.IotSystems.PrepayModel;
using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Attributes;
using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Interfaces;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
@ -24,6 +25,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Threading.Tasks; using System.Threading.Tasks;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Samples; namespace JiShe.CollectBus.Samples;
@ -35,14 +37,18 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
private readonly IoTDbOptions _options; private readonly IoTDbOptions _options;
private readonly IRedisDataCacheService _redisDataCacheService; private readonly IRedisDataCacheService _redisDataCacheService;
private readonly IProducerService _producerService;
private readonly ITcpService _tcpService;
public SampleAppService(IIoTDbProvider iotDBProvider, IOptions<IoTDbOptions> options, public SampleAppService(IIoTDbProvider iotDBProvider, IOptions<IoTDbOptions> options,
IoTDBRuntimeContext dbContext, ILogger<SampleAppService> logger, IRedisDataCacheService redisDataCacheService) IoTDBRuntimeContext dbContext, ILogger<SampleAppService> logger, IRedisDataCacheService redisDataCacheService, IProducerService producerService, ITcpService tcpService)
{ {
_iotDBProvider = iotDBProvider; _iotDBProvider = iotDBProvider;
_options = options.Value; _options = options.Value;
_dbContext = dbContext; _dbContext = dbContext;
_logger = logger; _logger = logger;
_redisDataCacheService = redisDataCacheService; _redisDataCacheService = redisDataCacheService;
_producerService =producerService;
_tcpService=tcpService;
} }
/// <summary> /// <summary>
@ -399,5 +405,44 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS
_logger.LogWarning($"收到订阅消息: {obj}"); _logger.LogWarning($"收到订阅消息: {obj}");
return SubscribeAck.Success(); return SubscribeAck.Success();
} }
/// <summary>
/// 测试Kafka下发报文
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[AllowAnonymous]
public async Task<bool> KafkaSendAsync(KafkaSendDto input)
{
ArgumentException.ThrowIfNullOrWhiteSpace(input.Address);
ArgumentException.ThrowIfNullOrWhiteSpace(input.Frame);
input.Frame = input.Frame.Replace(" ", "");
await _producerService.ProduceAsync<KafkaSendDto>(ProtocolConst.TESTSENDTOPIC, input);
return await Task.FromResult(true);
}
/// <summary>
/// 订阅下发的数据
/// </summary>
/// <param name="dto"></param>
/// <returns></returns>
[KafkaSubscribe(ProtocolConst.TESTSENDTOPIC)]
public async Task<ISubscribeAck> KafkaSubscribeTestSendAsync(KafkaSendDto dto)
{
if (_tcpService.ClientExists(dto.Address))
{
// 发送给设备
await _tcpService.SendAsync(dto.Address, dto.Frame);
_logger.LogWarning($"{dto.Address}下发消息报文:{dto.Frame}");
}
else
{
_logger.LogWarning($"{dto.Address}集中器未上线: {dto.Serialize()}");
}
// 测试不管是否上线都ACK
return SubscribeAck.Success();
}
} }

View File

@ -170,5 +170,10 @@ namespace JiShe.CollectBus.Common.Consts
/// 测试主题格式 /// 测试主题格式
/// </summary> /// </summary>
public const string TESTTOPIC = "test-topic"; public const string TESTTOPIC = "test-topic";
/// <summary>
/// 测试下发主题格式
/// </summary>
public const string TESTSENDTOPIC = "test-send-topic";
} }
} }

View File

@ -16,7 +16,6 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0" /> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet" /> <link href="libs/bootstrap/css/bootstrap.min.css" rel="stylesheet" />
<title>后端服务</title> <title>后端服务</title>
</head> </head>
<body> <body>