using JiShe.CollectBus.Common; using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol3761; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Threading.Tasks; using TouchSocket.Core; using TouchSocket.Sockets; using Volo.Abp.Caching; using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Plugins { public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin { private readonly IProducerService _producerService; private readonly ILogger _logger; private readonly IRepository _deviceRepository; private readonly IDistributedCache _ammeterInfoCache; private readonly IServiceProvider _serviceProvider; private readonly IProtocolService _protocolService; private readonly ServerApplicationOptions _serverApplicationOptions; /// /// /// /// /// /// /// /// /// /// public TcpMonitor(IProducerService producerService, ILogger logger, IRepository deviceRepository, IDistributedCache ammeterInfoCache, IServiceProvider serviceProvider, IOptions serverApplicationOptions, IProtocolService protocolService) { _producerService = producerService; _logger = logger; _deviceRepository = deviceRepository; _ammeterInfoCache = ammeterInfoCache; _serviceProvider= serviceProvider; _protocolService = protocolService; _serverApplicationOptions = serverApplicationOptions.Value; } public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e) { if (string.IsNullOrWhiteSpace(_serverApplicationOptions.DefaultProtocolPlugin)) { _logger.LogError($"请配置默认的端到云协议插件!"); } var messageHexString = Convert.ToHexString(e.ByteBlock.Span); var protocolPlugin = await _protocolService.GetProtocolServiceAsync(_serverApplicationOptions.DefaultProtocolPlugin); if (protocolPlugin == null) { _logger.LogError($"默认的端到云协议插件{_serverApplicationOptions.DefaultProtocolPlugin}不存在!"); } var tcpSessionClient = (ITcpSessionClient)client; TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync(tcpSessionClient, messageHexString); if (tB3761 == null) { // TODO: 暂时不处理,后续再处理 _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); } await e.InvokeNext(); } public async Task OnTcpConnecting(ITcpSession client, ConnectingEventArgs e) { var tcpSessionClient = (ITcpSessionClient)client; _logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}正在连接中..."); await e.InvokeNext(); } public async Task OnTcpConnected(ITcpSession client, ConnectedEventArgs e) { var tcpSessionClient = (ITcpSessionClient)client; _logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已连接"); await e.InvokeNext(); } public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e) { //todo: 删除Redis缓存 var tcpSessionClient = (ITcpSessionClient)client; var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id); if (entity != null) { entity.UpdateByOnClosed(); await _deviceRepository.UpdateAsync(entity); } else { _logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败"); } await e.InvokeNext(); } } }