diff --git a/Directory.Build.props b/Directory.Build.props index e79fc14..33557c4 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -2,7 +2,7 @@ - 1.0.5.04 + 1.0.5.05 9.1.1 diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs new file mode 100644 index 0000000..fc448b9 --- /dev/null +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IServiceCommunicationChannelSubscriberService.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Volo.Abp.Application.Services; + +namespace JiShe.CollectBus.Subscribers +{ + /// + /// 数据通讯通道订阅者服务 + /// + public interface IServiceCommunicationChannelSubscriberService:IApplicationService + { + /// + /// 数据通讯通道消息消费订阅 + /// + /// + /// + Task ServiceCommunicationChannelIssuedEvent(Dictionary issuedMessage); + } +} diff --git a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index f420a28..5244e57 100644 --- a/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/services/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Common; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.Protocol.Interfaces; @@ -17,37 +18,37 @@ namespace JiShe.CollectBus.Plugins { private readonly IProducerService _producerService; private readonly ILogger _logger; - private readonly IRepository _deviceRepository; private readonly IDistributedCache _ammeterInfoCache; private readonly IServiceProvider _serviceProvider; private readonly IProtocolService _protocolService; private readonly ServerApplicationOptions _serverApplicationOptions; + private readonly IFreeRedisProvider _freeRedisProvider; /// /// /// /// /// - /// /// /// /// /// + /// public TcpMonitor(IProducerService producerService, ILogger logger, - IRepository deviceRepository, IDistributedCache ammeterInfoCache, IServiceProvider serviceProvider, IOptions serverApplicationOptions, - IProtocolService protocolService) + IProtocolService protocolService, + IFreeRedisProvider freeRedisProvider) { _producerService = producerService; _logger = logger; - _deviceRepository = deviceRepository; _ammeterInfoCache = ammeterInfoCache; _serviceProvider= serviceProvider; _protocolService = protocolService; _serverApplicationOptions = serverApplicationOptions.Value; + _freeRedisProvider = freeRedisProvider; } public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e) @@ -71,6 +72,61 @@ namespace JiShe.CollectBus.Plugins _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); } + //登录和心跳的时候,需要过滤TCP连接 + if (tB3761.DT?.Fn == (int)FN.登录 || tB3761.DT?.Fn == (int)FN.心跳) + { + string focusAddress = tB3761.A.Code; + if (string.IsNullOrWhiteSpace(focusAddress))//集中器地址为空,主动关闭连接 + { + await tcpSessionClient.CloseAsync(); + _logger.LogError($"指令解析失败,没有找到集中器地址,指令内容:{messageHexString}"); + return; + } + + //查找Redis缓存的集中器信息 + var focusCacheInfos = _freeRedisProvider.Instance.HGet(RedisConst.CacheAllFocusInfoHashKey, focusAddress); + if (focusCacheInfos == null) + { + await tcpSessionClient.CloseAsync(); + _logger.LogError($"TCP连接关闭,没有找到集中器地址{focusAddress}的缓存信息"); + return; + } + + //检查集中器的信息的服务器标记是否为空,如果不为空,需要检查是否与当前服务器的标记一致 + if (!string.IsNullOrWhiteSpace(focusCacheInfos.ServerTagName) && focusCacheInfos.ServerTagName != _serverApplicationOptions.ServerTagName) + { + await tcpSessionClient.CloseAsync(); + _logger.LogError($"TCP连接关闭,集中器地址{focusAddress}的服务器标记为:{focusCacheInfos.ServerTagName},当前服务器的标记为:{_serverApplicationOptions.ServerTagName},不一致拒绝连接。"); + return; + } + + int currentTCPConnectionCount = tcpSessionClient.Service.Count; + int configTCPConnectionCount = _serverApplicationOptions.TCPConnectionCount;//配置TCP连接数量 + + _logger.LogWarning($"当前配置连接数量为:{configTCPConnectionCount},当前连接数量为:{currentTCPConnectionCount}"); + + if (currentTCPConnectionCount >= configTCPConnectionCount) + { + _logger.LogError($"当前连接数量为:{currentTCPConnectionCount},大于配置连接数量{configTCPConnectionCount},将拒绝集中{focusAddress}连接"); + await tcpSessionClient.CloseAsync(); + return; + } + + //缓存更新状态 + focusCacheInfos.LastSurvivalTime = DateTime.Now; + focusCacheInfos.ServerTagName = _serverApplicationOptions.ServerTagName; + focusCacheInfos.OnLineStatus = true; + + _freeRedisProvider.Instance.HSet(RedisConst.CacheAllFocusInfoHashKey, focusAddress, focusCacheInfos); + + //连接成功以后,通知信息到后台 + Dictionary channelMessage = new Dictionary(); + channelMessage.Add(ServiceCommunicationTypeEnum.FocusStatusReceived, focusCacheInfos.Serialize()); + _= _producerService.ProduceAsync(KafkaTopicConsts.ServiceCommunicationChannelTopic, channelMessage); + } + + //todo 后续可以考虑Redis的bitmap做日活签到。 + // TODO:要注意保存树模型的时时标字段,如果没有时标,则取接受时间做兼容 await e.InvokeNext(); } @@ -94,21 +150,23 @@ namespace JiShe.CollectBus.Plugins } public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e) - { - //todo: 删除Redis缓存 - var tcpSessionClient = (ITcpSessionClient)client; - //var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id); - //if (entity != null) - //{ - // entity.UpdateByOnClosed(); - // await _deviceRepository.UpdateAsync(entity); - //} - //else - //{ - //_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败"); - //} + { + var tcpSessionClient = (ITcpSessionClient)client; _logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接"); + //查找Redis缓存的集中器信息 + var focusCacheInfos = _freeRedisProvider.Instance.HGet(RedisConst.CacheAllFocusInfoHashKey, tcpSessionClient.Id); + if (focusCacheInfos != null) + { + + //缓存更新状态 + focusCacheInfos.LastSurvivalTime = DateTime.Now; + focusCacheInfos.ServerTagName = _serverApplicationOptions.ServerTagName; + focusCacheInfos.OnLineStatus = false; + + _freeRedisProvider.Instance.HSet(RedisConst.CacheAllFocusInfoHashKey, tcpSessionClient.Id, focusCacheInfos); + } + await e.InvokeNext(); } diff --git a/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs b/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs new file mode 100644 index 0000000..8ee60e6 --- /dev/null +++ b/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs @@ -0,0 +1,91 @@ +using JiShe.CollectBus.Protocol.Interfaces; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using TouchSocket.Sockets; + +namespace JiShe.CollectBus.Subscribers +{ + public class ServiceCommunicationChannelSubscriberService : CollectBusAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe + { + private readonly ILogger _logger; + private readonly ITcpService _tcpService; + private readonly IServiceProvider _serviceProvider; + private readonly IoTDBSessionPoolProvider _dbProvider; + private readonly IProtocolService _protocolService; + + + public ServiceCommunicationChannelSubscriberService(ILogger logger, + ITcpService tcpService, + IServiceProvider serviceProvider, + IoTDBSessionPoolProvider dbProvider, + IProtocolService protocolService) + { + _logger = logger; + _tcpService = tcpService; + _serviceProvider = serviceProvider; + _dbProvider = dbProvider; + _protocolService = protocolService; + } + + + /// + /// 数据通讯通道消息消费订阅 + /// + /// + /// + [KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationChannelTopic)] + public async Task ServiceCommunicationChannelIssuedEvent(Dictionary issuedMessage) + { + Logger.LogWarning($"数据通讯通道消息消费订阅解析:{issuedMessage.Serialize()}"); + var tempFirstKeyInfo = issuedMessage.FirstOrDefault(); + if (tempFirstKeyInfo.Value == null) + { + return SubscribeAck.Fail(); + } + + ServiceCommunicationTypeEnum serviceCommunication = tempFirstKeyInfo.Key; + bool tempResult = false; + switch (serviceCommunication) + { + case ServiceCommunicationTypeEnum.ArchivalDataIssued: + if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value)) + { + FocusCacheInfos focusCacheInfo = tempFirstKeyInfo.Value.Deserialize(); + tempResult = await SendArchivalDataIssued(focusCacheInfo); + } + break; + default: + throw new Exception("暂不支持该数据通讯通道消息消费订阅解析"); + } + + return tempResult == true ? SubscribeAck.Success() : SubscribeAck.Fail(); + } + + protected async Task SendArchivalDataIssued(FocusCacheInfos focusCacheInfo) + { + try + { + var checkResult = _tcpService.ClientExists(focusCacheInfo.FocusAddress); + if (checkResult) + { + string issuedMessageHexString = ""; + await _tcpService.SendAsync(focusCacheInfo.FocusAddress, Convert.FromHexString(issuedMessageHexString)); + + return true; + } + else + { + return false; + } + } + catch (Exception) + { + throw; + } + } + } +}