2025-04-17 20:28:50 +08:00

279 lines
11 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using DeviceDetectorNET.Parser.Device;
using DotNetCore.CAP;
using JiShe.CollectBus.Ammeters;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Extensions;
using JiShe.CollectBus.Common.Helpers;
using JiShe.CollectBus.Enums;
using JiShe.CollectBus.Interceptors;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using MassTransit;
using Microsoft.Extensions.Logging;
using TouchSocket.Core;
using TouchSocket.Sockets;
using Volo.Abp.Caching;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Domain.Repositories;
using static FreeSql.Internal.GlobalFilter;
namespace JiShe.CollectBus.Plugins
{
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin
{
private readonly ICapPublisher _producerBus;
private readonly IProducerService _producerService;
private readonly ILogger<TcpMonitor> _logger;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
/// <summary>
///
/// </summary>
/// <param name="producerService"></param>
/// <param name="logger"></param>
/// <param name="deviceRepository"></param>
/// <param name="ammeterInfoCache"></param>
public TcpMonitor(ICapPublisher producerBus, IProducerService producerService,
ILogger<TcpMonitor> logger,
IRepository<Device, Guid> deviceRepository,
IDistributedCache<AmmeterInfo> ammeterInfoCache)
{
_producerBus = producerBus;
_producerService = producerService;
_logger = logger;
_deviceRepository = deviceRepository;
_ammeterInfoCache = ammeterInfoCache;
}
public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
var hexStringList = messageHexString.StringToPairs();
var aFn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.AFN);
var fn = (int?)hexStringList.GetAnalyzeValue(CommandChunkEnum.FN);
var aTuple = (Tuple<string, int>)hexStringList.GetAnalyzeValue(CommandChunkEnum.A);
if (aFn.HasValue && fn.HasValue && aTuple != null && !string.IsNullOrWhiteSpace(aTuple.Item1))
{
var tcpSessionClient = (ITcpSessionClient)client;
if ((AFN)aFn == AFN.)
{
switch (fn)
{
case 1:
await OnTcpLoginReceived(tcpSessionClient, messageHexString, aTuple.Item1);
break;
case 3:
//心跳帧有两种情况:
//1. 集中器先有登录帧,再有心跳帧
//2. 集中器没有登录帧,只有心跳帧
await OnTcpHeartbeatReceived(tcpSessionClient, messageHexString, aTuple.Item1);
break;
default:
_logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
break;
}
}
else
{
await OnTcpNormalReceived(tcpSessionClient, messageHexString, aTuple.Item1,aFn.ToString()!.PadLeft(2,'0'));
}
}
else
{
_logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
}
await e.InvokeNext();
}
//[GeneratorPlugin(typeof(ITcpConnectingPlugin))]
public async Task OnTcpConnecting(ITcpSession client, ConnectingEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}正在连接中...");
await e.InvokeNext();
}
//[GeneratorPlugin(typeof(ITcpConnectedPlugin))]
public async Task OnTcpConnected(ITcpSession client, ConnectedEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已连接");
await e.InvokeNext();
}
//[GeneratorPlugin(typeof(ITcpClosedPlugin))]//ITcpSessionClient
public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id);
if (entity != null)
{
entity.UpdateByOnClosed();
await _deviceRepository.UpdateAsync(entity);
}
else
{
_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败");
}
await e.InvokeNext();
}
/// <summary>
/// 登录帧处理
/// </summary>
/// <param name="client"></param>
/// <param name="messageHexString"></param>
/// <param name="deviceNo">集中器编号</param>
/// <returns></returns>
private async Task OnTcpLoginReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
string oldClientId = $"{client.Id}";
await client.ResetIdAsync(deviceNo);
var deviceInfoList= await _deviceRepository.GetListAsync(a => a.Number == deviceNo);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo);
if (entity == null)
{
await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedLoginEvent = new MessageReceivedLogin
{
ClientId = deviceNo,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageHexString,
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
//await _producerBus.PublishAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
await _producerService.ProduceAsync(ProtocolConst.SubscriberLoginReceivedEventName, messageReceivedLoginEvent);
//await _producerBus.Publish( messageReceivedLoginEvent);
}
private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo)
{
string clientId = deviceNo;
string oldClientId = $"{client.Id}";
var deviceInfoList = await _deviceRepository.GetListAsync(a => a.Number == deviceNo);
if (deviceInfoList != null && deviceInfoList.Count > 1)
{
//todo 推送集中器编号重复预警
_logger.LogError($"集中器编号:{deviceNo},存在多个集中器,请检查集中器编号是否重复");
return;
}
var entity = deviceInfoList?.FirstOrDefault(a => a.Number == deviceNo);
if (entity == null) //没有登录帧的设备,只有心跳帧
{
await client.ResetIdAsync(clientId);
await _deviceRepository.InsertAsync(new Device(deviceNo, oldClientId, DateTime.Now, DateTime.Now, DeviceStatus.Online));
}
else
{
if (clientId != oldClientId)
{
entity.UpdateByLoginAndHeartbeat(oldClientId);
}
else
{
entity.UpdateByLoginAndHeartbeat();
}
await _deviceRepository.UpdateAsync(entity);
}
var messageReceivedHeartbeatEvent = new MessageReceivedHeartbeat
{
ClientId = clientId,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageHexString,
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
};
//await _producerBus.PublishAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
await _producerService.ProduceAsync(ProtocolConst.SubscriberHeartbeatReceivedEventName, messageReceivedHeartbeatEvent);
//await _producerBus.Publish(messageReceivedHeartbeatEvent);
}
/// <summary>
/// 正常帧处理将不同的AFN进行分发
/// </summary>
/// <param name="client"></param>
/// <param name="messageHexString"></param>
/// <param name="deviceNo"></param>
/// <param name="aFn"></param>
/// <returns></returns>
private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo,string aFn)
{
//await _producerBus.Publish(new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
//string topicName = string.Format(ProtocolConst.AFNTopicNameFormat, aFn);
//todo 如何确定时标?目前集中器的采集频率,都是固定,数据上报的时候,根据当前时间,往后推测出应当采集的时间点作为时标。但是如果由于网络问题,数据一直没上报的情况改怎么计算?
//await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
//{
// ClientId = client.Id,
// ClientIp = client.IP,
// ClientPort = client.Port,
// MessageHexString = messageHexString,
// DeviceNo = deviceNo,
// MessageId = NewId.NextGuid().ToString()
//});
await _producerService.ProduceAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived
{
ClientId = client.Id,
ClientIp = client.IP,
ClientPort = client.Port,
MessageHexString = messageHexString,
DeviceNo = deviceNo,
MessageId = NewId.NextGuid().ToString()
});
}
}
}