diff --git a/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs b/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs index 12f46d1..c32d892 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Samples/SampleDto.cs @@ -14,3 +14,12 @@ public class SampleDto2 { public int Value { get; set; } } + + +public class KafkaSendDto +{ + public string Address { get; set; } + + public string Frame { get; set; } + +} diff --git a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 25d16f3..049db1f 100644 --- a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -100,16 +100,17 @@ namespace JiShe.CollectBus.Plugins { //todo: 删除Redis缓存 var tcpSessionClient = (ITcpSessionClient)client; - var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id); - if (entity != null) - { - entity.UpdateByOnClosed(); - await _deviceRepository.UpdateAsync(entity); - } - else - { - _logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败"); - } + //var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id); + //if (entity != null) + //{ + // entity.UpdateByOnClosed(); + // await _deviceRepository.UpdateAsync(entity); + //} + //else + //{ + //_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败"); + //} + _logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接"); await e.InvokeNext(); } diff --git a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs index ea2415e..4cb325c 100644 --- a/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs +++ b/services/JiShe.CollectBus.Application/Samples/SampleAppService.cs @@ -14,6 +14,7 @@ using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.PrepayModel; using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Interfaces; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; @@ -24,6 +25,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading.Tasks; +using TouchSocket.Sockets; namespace JiShe.CollectBus.Samples; @@ -35,14 +37,18 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS private readonly IoTDbOptions _options; private readonly IRedisDataCacheService _redisDataCacheService; + private readonly IProducerService _producerService; + private readonly ITcpService _tcpService; public SampleAppService(IIoTDbProvider iotDBProvider, IOptions options, - IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService) + IoTDBRuntimeContext dbContext, ILogger logger, IRedisDataCacheService redisDataCacheService, IProducerService producerService, ITcpService tcpService) { _iotDBProvider = iotDBProvider; _options = options.Value; _dbContext = dbContext; _logger = logger; _redisDataCacheService = redisDataCacheService; + _producerService =producerService; + _tcpService=tcpService; } /// @@ -399,5 +405,44 @@ public class SampleAppService : CollectBusAppService, ISampleAppService, IKafkaS _logger.LogWarning($"收到订阅消息: {obj}"); return SubscribeAck.Success(); } + + /// + /// 测试Kafka下发报文 + /// + /// + /// + [AllowAnonymous] + public async Task KafkaSendAsync(KafkaSendDto input) + { + ArgumentException.ThrowIfNullOrWhiteSpace(input.Address); + ArgumentException.ThrowIfNullOrWhiteSpace(input.Frame); + input.Frame = input.Frame.Replace(" ", ""); + await _producerService.ProduceAsync(ProtocolConst.TESTSENDTOPIC, input); + return await Task.FromResult(true); + } + + + /// + /// 订阅下发的数据 + /// + /// + /// + [KafkaSubscribe(ProtocolConst.TESTSENDTOPIC)] + + public async Task KafkaSubscribeTestSendAsync(KafkaSendDto dto) + { + if (_tcpService.ClientExists(dto.Address)) + { + // 发送给设备 + await _tcpService.SendAsync(dto.Address, dto.Frame); + _logger.LogWarning($"{dto.Address}下发消息报文:{dto.Frame}"); + } + else + { + _logger.LogWarning($"{dto.Address}集中器未上线: {dto.Serialize()}"); + } + // 测试不管是否上线都ACK + return SubscribeAck.Success(); + } } diff --git a/shared/JiShe.CollectBus.Common/Consts/ProtocolConst.cs b/shared/JiShe.CollectBus.Common/Consts/ProtocolConst.cs index 95c2c44..0758899 100644 --- a/shared/JiShe.CollectBus.Common/Consts/ProtocolConst.cs +++ b/shared/JiShe.CollectBus.Common/Consts/ProtocolConst.cs @@ -170,5 +170,10 @@ namespace JiShe.CollectBus.Common.Consts /// 测试主题格式 /// public const string TESTTOPIC = "test-topic"; + + /// + /// 测试下发主题格式 + /// + public const string TESTSENDTOPIC = "test-send-topic"; } } diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index 30e91e8..a58ded5 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,7 +16,6 @@ 后端服务 -