2025-04-24 00:37:00 +08:00

200 lines
8.2 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.Protocol.Contracts.Abstracts;
using JiShe.CollectBus.Protocol.Contracts.Interfaces;
using JiShe.CollectBus.Protocol.Contracts.Models;
using JiShe.CollectBus.Protocol.Contracts.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using TouchSocket.Core;
using TouchSocket.Sockets;
using Volo.Abp.Caching;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories;
using static System.Formats.Asn1.AsnWriter;
using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.Plugins
{
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{
private readonly IProducerService _producerService;
private readonly ILogger<TcpMonitor> _logger;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
private readonly IServiceProvider _serviceProvider;
private readonly IProtocolService _protocolService;
/// <summary>
///
/// </summary>
/// <param name="producerService"></param>
/// <param name="logger"></param>
/// <param name="deviceRepository"></param>
/// <param name="ammeterInfoCache"></param>
/// <param name="serviceProvider"></param>
public TcpMonitor(IProducerService producerService,
ILogger<TcpMonitor> logger,
IRepository<Device, Guid> deviceRepository,
IDistributedCache<AmmeterInfo> ammeterInfoCache, IServiceProvider serviceProvider, IProtocolService protocolService)
{
_producerService = producerService;
_logger = logger;
_deviceRepository = deviceRepository;
_ammeterInfoCache = ammeterInfoCache;
_serviceProvider= serviceProvider;
_protocolService = protocolService;
}
public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
var protocolPlugin = await _protocolService.GetProtocolServiceAsync("376.1");
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
}
var tcpSessionClient = (ITcpSessionClient)client;
TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync<TB3761>(tcpSessionClient, messageHexString);
if (tB3761 == null)
{
_logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
}
else
{
await OnTcpNormalReceived(tcpSessionClient, messageHexString, tB3761);
}
await e.InvokeNext();
}
//[GeneratorPlugin(typeof(ITcpConnectingPlugin))]
public async Task OnTcpConnecting(ITcpSession client, ConnectingEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}正在连接中...");
await e.InvokeNext();
}
//[GeneratorPlugin(typeof(ITcpConnectedPlugin))]
public async Task OnTcpConnected(ITcpSession client, ConnectedEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已连接");
await e.InvokeNext();
}
//[GeneratorPlugin(typeof(ITcpClosedPlugin))]//ITcpSessionClient
public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id);
if (entity != null)
{
entity.UpdateByOnClosed();
await _deviceRepository.UpdateAsync(entity);
}
else
{
_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败");
}
await e.InvokeNext();
}
/// <summary>
/// 正常帧处理将不同的AFN进行分发
/// </summary>
/// <param name="tcpSessionClient"></param>
/// <param name="messageHexString"></param>
/// <param name="tB3761"></param>
/// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient tcpSessionClient,string messageHexString, TB3761? tB3761)
{
//await _producerBus.Publish(new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
if(tB3761?.AFN_FC?.AFN==null || tB3761.DT?.Fn==null)
{
_logger.LogError("376.1协议解析AFN失败");
return;
}
// 登录心跳已做了处理,故需要忽略登录和心跳帧
//if(tB3761.DT?.Fn == (int)FN.登录 || tB3761.DT?.Fn == (int)FN.心跳)
// return;
//TODO根据AFN进行分流推送到kafka
string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, tB3761?.AFN_FC?.AFN.ToString().PadLeft(2,'0'));
List<string> topics = ProtocolConstExtensions.GetAllTopicNamesByReceived();
if(topics.Contains(topicName))
await _producerService.ProduceAsync(topicName, new MessageReceived
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = messageHexString,
DeviceNo = tB3761?.A?.Code!,
MessageId = Guid.NewGuid().ToString()
});
else
{
_logger.LogError($"不支持的上报kafka主题{topicName}");
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{
ClientId = tcpSessionClient.Id,
ClientIp = tcpSessionClient.IP,
ClientPort = tcpSessionClient.Port,
MessageHexString = messageHexString,
DeviceNo = tB3761?.A?.Code!,
MessageId = Guid.NewGuid().ToString()
});
}
}
}
}