2025-06-05 14:14:13 +08:00

181 lines
8.7 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.Protocol.Interfaces;
using JiShe.CollectBus.Protocol3761;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using TouchSocket.Core;
using TouchSocket.Sockets;
using Volo.Abp.Caching;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Repositories;
namespace JiShe.CollectBus.Plugins
{
public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin, ITcpClosingPlugin
{
private readonly IProducerService _producerService;
private readonly ILogger<TcpMonitor> _logger;
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
private readonly IServiceProvider _serviceProvider;
private readonly IProtocolService _protocolService;
private readonly ServerApplicationOptions _serverApplicationOptions;
private readonly IFreeRedisProvider _freeRedisProvider;
/// <summary>
///
/// </summary>
/// <param name="producerService"></param>
/// <param name="logger"></param>
/// <param name="ammeterInfoCache"></param>
/// <param name="serviceProvider"></param>
/// <param name="serverApplicationOptions"></param>
/// <param name="protocolService"></param>
/// <param name="freeRedisProvider"></param>
public TcpMonitor(IProducerService producerService,
ILogger<TcpMonitor> logger,
IDistributedCache<AmmeterInfo> ammeterInfoCache,
IServiceProvider serviceProvider,
IOptions<ServerApplicationOptions> serverApplicationOptions,
IProtocolService protocolService,
IFreeRedisProvider freeRedisProvider)
{
_producerService = producerService;
_logger = logger;
_ammeterInfoCache = ammeterInfoCache;
_serviceProvider= serviceProvider;
_protocolService = protocolService;
_serverApplicationOptions = serverApplicationOptions.Value;
_freeRedisProvider = freeRedisProvider;
}
public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
{
if (string.IsNullOrWhiteSpace(_serverApplicationOptions.DefaultProtocolPlugin))
{
_logger.LogError($"请配置默认的端到云协议插件!");
}
var messageHexString = Convert.ToHexString(e.ByteBlock.Span);
var protocolPlugin = await _protocolService.GetProtocolServiceAsync(_serverApplicationOptions.DefaultProtocolPlugin);
if (protocolPlugin == null)
{
_logger.LogError($"默认的端到云协议插件{_serverApplicationOptions.DefaultProtocolPlugin}不存在!");
}
var tcpSessionClient = (ITcpSessionClient)client;
TB3761? tB3761 = await protocolPlugin!.AnalyzeAsync<TB3761>(tcpSessionClient, messageHexString);
if (tB3761 == null)
{
// TODO: 暂时不处理,后续再处理
_logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
}
////登录和心跳的时候需要过滤TCP连接
//if (tB3761.DT?.Fn == (int)FN.登录 || tB3761.DT?.Fn == (int)FN.心跳)
//{
// string focusAddress = tB3761.A.Code;
// if (string.IsNullOrWhiteSpace(focusAddress))//集中器地址为空,主动关闭连接
// {
// await tcpSessionClient.CloseAsync();
// _logger.LogError($"指令解析失败,没有找到集中器地址,指令内容:{messageHexString}");
// return;
// }
// //查找Redis缓存的集中器信息
// var focusCacheInfos = _freeRedisProvider.Instance.HGet<FocusCacheInfos>(RedisConst.CacheAllFocusInfoHashKey, focusAddress);
// if (focusCacheInfos == null)
// {
// await tcpSessionClient.CloseAsync();
// _logger.LogError($"TCP连接关闭没有找到集中器地址{focusAddress}的缓存信息");
// return;
// }
// //检查集中器的信息的服务器标记是否为空,如果不为空,需要检查是否与当前服务器的标记一致
// if (!string.IsNullOrWhiteSpace(focusCacheInfos.ServerTagName) && focusCacheInfos.ServerTagName != _serverApplicationOptions.ServerTagName)
// {
// await tcpSessionClient.CloseAsync();
// _logger.LogError($"TCP连接关闭集中器地址{focusAddress}的服务器标记为:{focusCacheInfos.ServerTagName},当前服务器的标记为:{_serverApplicationOptions.ServerTagName},不一致拒绝连接。");
// return;
// }
// int currentTCPConnectionCount = tcpSessionClient.Service.Count;
// int configTCPConnectionCount = _serverApplicationOptions.TCPConnectionCount;//配置TCP连接数量
// _logger.LogWarning($"当前配置连接数量为:{configTCPConnectionCount},当前连接数量为:{currentTCPConnectionCount}");
// if (currentTCPConnectionCount >= configTCPConnectionCount)
// {
// _logger.LogError($"当前连接数量为:{currentTCPConnectionCount},大于配置连接数量{configTCPConnectionCount},将拒绝集中{focusAddress}连接");
// await tcpSessionClient.CloseAsync();
// return;
// }
// //缓存更新状态
// focusCacheInfos.LastSurvivalTime = DateTime.Now;
// focusCacheInfos.ServerTagName = _serverApplicationOptions.ServerTagName;
// focusCacheInfos.OnLineStatus = true;
// _freeRedisProvider.Instance.HSet<FocusCacheInfos>(RedisConst.CacheAllFocusInfoHashKey, focusAddress, focusCacheInfos);
// //连接成功以后,通知信息到后台
// Dictionary<ServiceCommunicationTypeEnum, string> channelMessage = new Dictionary<ServiceCommunicationTypeEnum, string>();
// channelMessage.Add(ServiceCommunicationTypeEnum.FocusStatusReceived, focusCacheInfos.Serialize());
// _= _producerService.ProduceAsync(KafkaTopicConsts.ServiceCommunicationChannelTopic, channelMessage);
//}
//todo 后续可以考虑Redis的bitmap做日活签到。
// TODO要注意保存树模型的时时标字段如果没有时标,则取接受时间做兼容
await e.InvokeNext();
}
public async Task OnTcpConnecting(ITcpSession client, ConnectingEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}正在连接中...");
await e.InvokeNext();
}
public async Task OnTcpConnected(ITcpSession client, ConnectedEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogInformation($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已连接");
await e.InvokeNext();
}
public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接");
//查找Redis缓存的集中器信息
var focusCacheInfos = _freeRedisProvider.Instance.HGet<FocusCacheInfos>(RedisConst.CacheAllFocusInfoHashKey, tcpSessionClient.Id);
if (focusCacheInfos != null)
{
//缓存更新状态
focusCacheInfos.LastSurvivalTime = DateTime.Now;
focusCacheInfos.ServerTagName = _serverApplicationOptions.ServerTagName;
focusCacheInfos.OnLineStatus = false;
_freeRedisProvider.Instance.HSet<FocusCacheInfos>(RedisConst.CacheAllFocusInfoHashKey, tcpSessionClient.Id, focusCacheInfos);
}
await e.InvokeNext();
}
public Task OnTcpClosing(ITcpSession client, ClosingEventArgs e)
{
var tcpSessionClient = (ITcpSessionClient)client;
_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}终端主动关闭");
return Task.CompletedTask;
}
}
}