using System; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Enums; using JiShe.CollectBus.Interceptors; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.Protocol.Contracts.Abstracts; using JiShe.CollectBus.Protocol.Contracts.Interfaces; using JiShe.CollectBus.Protocol.Contracts.Models; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using TouchSocket.Core; using TouchSocket.Sockets; using Volo.Abp.Caching; using Volo.Abp.DependencyInjection; using Volo.Abp.Domain.Entities; using Volo.Abp.Domain.Repositories; using static System.Formats.Asn1.AsnWriter; using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.Plugins { public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin { private readonly IProducerService _producerService; private readonly ILogger _logger; private readonly IRepository _deviceRepository; private readonly IDistributedCache _ammeterInfoCache; private readonly IServiceProvider _serviceProvider; /// /// /// /// /// /// /// /// public TcpMonitor(IProducerService producerService, ILogger logger, IRepository deviceRepository, IDistributedCache ammeterInfoCache, IServiceProvider serviceProvider) { _producerService = producerService; _logger = logger; _deviceRepository = deviceRepository; _ammeterInfoCache = ammeterInfoCache; _serviceProvider= serviceProvider; } public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e) { var messageHexString = Convert.ToHexString(e.ByteBlock.Span); var tcpSessionClient = (ITcpSessionClient)client; var protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); if (protocolPlugin == null) { _logger.LogError("协议不存在!"); } TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync(tcpSessionClient, messageHexString); if (tB3761 == null) { _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); } else { await OnTcpNormalReceived(tcpSessionClient, messageHexString, tB3761); } await e.InvokeNext(); } //[GeneratorPlugin(typeof(ITcpConnectingPlugin))] public async Task OnTcpConnecting(ITcpSession client, ConnectingEventArgs e) { var tcpSessionClient = (ITcpSessionClient)client; _logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}正在连接中..."); await e.InvokeNext(); } //[GeneratorPlugin(typeof(ITcpConnectedPlugin))] public async Task OnTcpConnected(ITcpSession client, ConnectedEventArgs e) { var tcpSessionClient = (ITcpSessionClient)client; _logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已连接"); await e.InvokeNext(); } //[GeneratorPlugin(typeof(ITcpClosedPlugin))]//ITcpSessionClient public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e) { var tcpSessionClient = (ITcpSessionClient)client; var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id); if (entity != null) { entity.UpdateByOnClosed(); await _deviceRepository.UpdateAsync(entity); } else { _logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败"); } await e.InvokeNext(); } /// /// 正常帧处理,将不同的AFN进行分发 /// /// /// /// /// private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761) { //var _protocolPlugin = _serviceProvider.GetKeyedService("StandardProtocolPlugin"); //if (_protocolPlugin == null) //{ // _logger.LogError("376.1协议插件不存在!"); //} //await _producerBus.Publish(new MessageReceived //{ // ClientId = client.Id, // ClientIp = client.IP, // ClientPort = client.Port, // MessageHexString = messageHexString, // DeviceNo = deviceNo, // MessageId = NewId.NextGuid().ToString() //}); //string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn); //todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算? //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived //{ // ClientId = client.Id, // ClientIp = client.IP, // ClientPort = client.Port, // MessageHexString = messageHexString, // DeviceNo = deviceNo, // MessageId = NewId.NextGuid().ToString() //}); if(tB3761?.AFN_FC?.AFN==null || tB3761.DT?.Fn==null) { _logger.LogError("376.1协议解析AFN失败"); return; } // 登录心跳已做了处理,故需要忽略登录和心跳帧 //if(tB3761.DT?.Fn == (int)FN.登录 || tB3761.DT?.Fn == (int)FN.心跳) // return; //TODO:根据AFN进行分流推送到kafka string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761?.AFN_FC?.AFN.ToString().PadLeft(2,'0')); List topics = ProtocolConstExtensions.GetAllTopicNamesByReceived(); if(topics.Contains(topicName)) await _producerService.ProduceAsync(topicName, new MessageReceived { ClientId = tcpSessionClient.Id, ClientIp = tcpSessionClient.IP, ClientPort = tcpSessionClient.Port, MessageHexString = messageHexString, DeviceNo = tB3761?.A?.Code!, MessageId = Guid.NewGuid().ToString() }); else { _logger.LogError($"不支持的上报kafka主题:{topicName}"); await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived { ClientId = tcpSessionClient.Id, ClientIp = tcpSessionClient.IP, ClientPort = tcpSessionClient.Port, MessageHexString = messageHexString, DeviceNo = tB3761?.A?.Code!, MessageId = Guid.NewGuid().ToString() }); } } } }