using System; using System.Threading.Tasks; using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Devices; using JiShe.CollectBus.Enums; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.MessageReceiveds; using JiShe.CollectBus.Protocol.Contracts; using MassTransit; using Microsoft.Extensions.Logging; using TouchSocket.Core; using TouchSocket.Sockets; using Volo.Abp.Caching; using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Plugins { public partial class TcpMonitor : PluginBase, ITransientDependency { private readonly ICapPublisher _capBus; private readonly ILogger _logger; private readonly IRepository _deviceRepository; private readonly IDistributedCache _ammeterInfoCache; /// /// /// /// /// /// /// public TcpMonitor(ICapPublisher capBus, ILogger logger, IRepository deviceRepository, IDistributedCache ammeterInfoCache) { _capBus = capBus; _logger = logger; _deviceRepository = deviceRepository; _ammeterInfoCache = ammeterInfoCache; } [GeneratorPlugin(typeof(ITcpReceivedPlugin))] public async Task OnTcpReceived(ITcpSessionClient client, ReceivedDataEventArgs e) { var messageHexString = Convert.ToHexString(e.ByteBlock.Span); var hexStringList = messageHexString.StringToPairs(); var aFn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN); var fn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN); var aTuple = (Tuple)hexStringList.GetAnalyzeValue(CommandChunkEnum.A); if (aFn.HasValue && fn.HasValue) { if ((AFN)aFn == AFN.链路接口检测) { switch (fn) { case 1: await OnTcpLoginReceived(client, messageHexString, aTuple.Item1); break; case 3: await OnTcpHeartbeatReceived(client, messageHexString, aTuple.Item1); break; default: _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); break; } } else { await OnTcpNormalReceived(client, messageHexString, aTuple.Item1); } } else { _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); } await e.InvokeNext(); } [GeneratorPlugin(typeof(ITcpConnectingPlugin))] public async Task OnTcpConnecting(ITcpSessionClient client, ConnectingEventArgs e) { _logger.LogInformation($"[TCP] ID:{client.Id} IP:{client.GetIPPort()}正在连接中..."); await e.InvokeNext(); } [GeneratorPlugin(typeof(ITcpConnectedPlugin))] public async Task OnTcpConnected(ITcpSessionClient client, ConnectedEventArgs e) { _logger.LogInformation($"[TCP] ID:{client.Id} IP:{client.GetIPPort()}已连接"); await e.InvokeNext(); } [GeneratorPlugin(typeof(ITcpClosedPlugin))] public async Task OnTcpClosed(ITcpSessionClient client, ClosedEventArgs e) { var entity = await _deviceRepository.FindAsync(a=>a.ClientId == client.Id); if (entity != null) { entity.UpdateByOnClosed(); await _deviceRepository.UpdateAsync(entity); } else { _logger.LogWarning($"[TCP] ID:{client.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败"); } await e.InvokeNext(); } private async Task OnTcpLoginReceived(ITcpSessionClient client, string messageHexString, string deviceNo) { var messageReceivedLoginEvent = new MessageReceivedLogin { ClientId = client.Id, ClientIp = client.IP, ClientPort = client.Port, MessageHexString = messageHexString, DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent); var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo); if (entity == null) { await _deviceRepository.InsertAsync(new Device(deviceNo, client.Id, DateTime.Now, DateTime.Now, DeviceStatus.Online)); } else { entity.UpdateByLoginAndHeartbeat(client.Id); await _deviceRepository.UpdateAsync(entity); } } private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo) { var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat { ClientId = client.Id, ClientIp = client.IP, ClientPort = client.Port, MessageHexString = messageHexString, DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent); var entity = await _deviceRepository.FindAsync(a => a.Number == deviceNo); if (entity == null) { await _deviceRepository.InsertAsync(new Device(deviceNo, client.Id, DateTime.Now, DateTime.Now, DeviceStatus.Online)); } else { entity.UpdateByLoginAndHeartbeat(client.Id); await _deviceRepository.UpdateAsync(entity); } } private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo) { await _capBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived { ClientId = client.Id, ClientIp = client.IP, ClientPort = client.Port, MessageHexString = messageHexString, DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }); } } }