数据通讯通道消息消费订阅

This commit is contained in:
ChenYi 2025-06-04 13:40:51 +08:00
parent 25989539bf
commit 97f8cdae16
4 changed files with 190 additions and 19 deletions

View File

@ -2,7 +2,7 @@
<!-- 定义项目加载属性 --> <!-- 定义项目加载属性 -->
<PropertyGroup> <PropertyGroup>
<!--JiShe.ServicePro版本--> <!--JiShe.ServicePro版本-->
<ServiceProVersion>1.0.5.04 </ServiceProVersion> <ServiceProVersion>1.0.5.05 </ServiceProVersion>
<!--Volo Abp 版本--> <!--Volo Abp 版本-->
<VoloAbpVersion>9.1.1</VoloAbpVersion> <VoloAbpVersion>9.1.1</VoloAbpVersion>

View File

@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Application.Services;
namespace JiShe.CollectBus.Subscribers
{
/// <summary>
/// 数据通讯通道订阅者服务
/// </summary>
public interface IServiceCommunicationChannelSubscriberService:IApplicationService
{
/// <summary>
/// 数据通讯通道消息消费订阅
/// </summary>
/// <param name="issuedMessage"></param>
/// <returns></returns>
Task<ISubscribeAck> ServiceCommunicationChannelIssuedEvent(Dictionary<ServiceCommunicationTypeEnum, string> issuedMessage);
}
}

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Common; using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.IotSystems.Ammeters; using JiShe.CollectBus.IotSystems.Ammeters;
using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.Protocol.Interfaces; using JiShe.CollectBus.Protocol.Interfaces;
@ -17,37 +18,37 @@ namespace JiShe.CollectBus.Plugins
{ {
private readonly IProducerService _producerService; private readonly IProducerService _producerService;
private readonly ILogger<TcpMonitor> _logger; private readonly ILogger<TcpMonitor> _logger;
private readonly IRepository<Device, Guid> _deviceRepository;
private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache; private readonly IDistributedCache<AmmeterInfo> _ammeterInfoCache;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private readonly IProtocolService _protocolService; private readonly IProtocolService _protocolService;
private readonly ServerApplicationOptions _serverApplicationOptions; private readonly ServerApplicationOptions _serverApplicationOptions;
private readonly IFreeRedisProvider _freeRedisProvider;
/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
/// <param name="producerService"></param> /// <param name="producerService"></param>
/// <param name="logger"></param> /// <param name="logger"></param>
/// <param name="deviceRepository"></param>
/// <param name="ammeterInfoCache"></param> /// <param name="ammeterInfoCache"></param>
/// <param name="serviceProvider"></param> /// <param name="serviceProvider"></param>
/// <param name="serverApplicationOptions"></param> /// <param name="serverApplicationOptions"></param>
/// <param name="protocolService"></param> /// <param name="protocolService"></param>
/// <param name="freeRedisProvider"></param>
public TcpMonitor(IProducerService producerService, public TcpMonitor(IProducerService producerService,
ILogger<TcpMonitor> logger, ILogger<TcpMonitor> logger,
IRepository<Device, Guid> deviceRepository,
IDistributedCache<AmmeterInfo> ammeterInfoCache, IDistributedCache<AmmeterInfo> ammeterInfoCache,
IServiceProvider serviceProvider, IServiceProvider serviceProvider,
IOptions<ServerApplicationOptions> serverApplicationOptions, IOptions<ServerApplicationOptions> serverApplicationOptions,
IProtocolService protocolService) IProtocolService protocolService,
IFreeRedisProvider freeRedisProvider)
{ {
_producerService = producerService; _producerService = producerService;
_logger = logger; _logger = logger;
_deviceRepository = deviceRepository;
_ammeterInfoCache = ammeterInfoCache; _ammeterInfoCache = ammeterInfoCache;
_serviceProvider= serviceProvider; _serviceProvider= serviceProvider;
_protocolService = protocolService; _protocolService = protocolService;
_serverApplicationOptions = serverApplicationOptions.Value; _serverApplicationOptions = serverApplicationOptions.Value;
_freeRedisProvider = freeRedisProvider;
} }
public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e) public async Task OnTcpReceived(ITcpSession client, ReceivedDataEventArgs e)
@ -71,6 +72,61 @@ namespace JiShe.CollectBus.Plugins
_logger.LogError($"指令初步解析失败,指令内容:{messageHexString}"); _logger.LogError($"指令初步解析失败,指令内容:{messageHexString}");
} }
//登录和心跳的时候需要过滤TCP连接
if (tB3761.DT?.Fn == (int)FN. || tB3761.DT?.Fn == (int)FN.)
{
string focusAddress = tB3761.A.Code;
if (string.IsNullOrWhiteSpace(focusAddress))//集中器地址为空,主动关闭连接
{
await tcpSessionClient.CloseAsync();
_logger.LogError($"指令解析失败,没有找到集中器地址,指令内容:{messageHexString}");
return;
}
//查找Redis缓存的集中器信息
var focusCacheInfos = _freeRedisProvider.Instance.HGet<FocusCacheInfos>(RedisConst.CacheAllFocusInfoHashKey, focusAddress);
if (focusCacheInfos == null)
{
await tcpSessionClient.CloseAsync();
_logger.LogError($"TCP连接关闭没有找到集中器地址{focusAddress}的缓存信息");
return;
}
//检查集中器的信息的服务器标记是否为空,如果不为空,需要检查是否与当前服务器的标记一致
if (!string.IsNullOrWhiteSpace(focusCacheInfos.ServerTagName) && focusCacheInfos.ServerTagName != _serverApplicationOptions.ServerTagName)
{
await tcpSessionClient.CloseAsync();
_logger.LogError($"TCP连接关闭集中器地址{focusAddress}的服务器标记为:{focusCacheInfos.ServerTagName},当前服务器的标记为:{_serverApplicationOptions.ServerTagName},不一致拒绝连接。");
return;
}
int currentTCPConnectionCount = tcpSessionClient.Service.Count;
int configTCPConnectionCount = _serverApplicationOptions.TCPConnectionCount;//配置TCP连接数量
_logger.LogWarning($"当前配置连接数量为:{configTCPConnectionCount},当前连接数量为:{currentTCPConnectionCount}");
if (currentTCPConnectionCount >= configTCPConnectionCount)
{
_logger.LogError($"当前连接数量为:{currentTCPConnectionCount},大于配置连接数量{configTCPConnectionCount},将拒绝集中{focusAddress}连接");
await tcpSessionClient.CloseAsync();
return;
}
//缓存更新状态
focusCacheInfos.LastSurvivalTime = DateTime.Now;
focusCacheInfos.ServerTagName = _serverApplicationOptions.ServerTagName;
focusCacheInfos.OnLineStatus = true;
_freeRedisProvider.Instance.HSet<FocusCacheInfos>(RedisConst.CacheAllFocusInfoHashKey, focusAddress, focusCacheInfos);
//连接成功以后,通知信息到后台
Dictionary<ServiceCommunicationTypeEnum, string> channelMessage = new Dictionary<ServiceCommunicationTypeEnum, string>();
channelMessage.Add(ServiceCommunicationTypeEnum.FocusStatusReceived, focusCacheInfos.Serialize());
_= _producerService.ProduceAsync(KafkaTopicConsts.ServiceCommunicationChannelTopic, channelMessage);
}
//todo 后续可以考虑Redis的bitmap做日活签到。
// TODO要注意保存树模型的时时标字段如果没有时标,则取接受时间做兼容 // TODO要注意保存树模型的时时标字段如果没有时标,则取接受时间做兼容
await e.InvokeNext(); await e.InvokeNext();
} }
@ -95,20 +151,22 @@ namespace JiShe.CollectBus.Plugins
public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e) public async Task OnTcpClosed(ITcpSession client, ClosedEventArgs e)
{ {
//todo: 删除Redis缓存
var tcpSessionClient = (ITcpSessionClient)client; var tcpSessionClient = (ITcpSessionClient)client;
//var entity = await _deviceRepository.FindAsync(a => a.ClientId == tcpSessionClient.Id);
//if (entity != null)
//{
// entity.UpdateByOnClosed();
// await _deviceRepository.UpdateAsync(entity);
//}
//else
//{
//_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接,但采集程序检索失败");
//}
_logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接"); _logger.LogWarning($"[TCP] ID:{tcpSessionClient.Id} IP:{client.GetIPPort()}已关闭连接");
//查找Redis缓存的集中器信息
var focusCacheInfos = _freeRedisProvider.Instance.HGet<FocusCacheInfos>(RedisConst.CacheAllFocusInfoHashKey, tcpSessionClient.Id);
if (focusCacheInfos != null)
{
//缓存更新状态
focusCacheInfos.LastSurvivalTime = DateTime.Now;
focusCacheInfos.ServerTagName = _serverApplicationOptions.ServerTagName;
focusCacheInfos.OnLineStatus = false;
_freeRedisProvider.Instance.HSet<FocusCacheInfos>(RedisConst.CacheAllFocusInfoHashKey, tcpSessionClient.Id, focusCacheInfos);
}
await e.InvokeNext(); await e.InvokeNext();
} }

View File

@ -0,0 +1,91 @@
using JiShe.CollectBus.Protocol.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Subscribers
{
public class ServiceCommunicationChannelSubscriberService : CollectBusAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe
{
private readonly ILogger<SubscriberAnalysisAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IoTDBSessionPoolProvider _dbProvider;
private readonly IProtocolService _protocolService;
public ServiceCommunicationChannelSubscriberService(ILogger<SubscriberAnalysisAppService> logger,
ITcpService tcpService,
IServiceProvider serviceProvider,
IoTDBSessionPoolProvider dbProvider,
IProtocolService protocolService)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_dbProvider = dbProvider;
_protocolService = protocolService;
}
/// <summary>
/// 数据通讯通道消息消费订阅
/// </summary>
/// <param name="issuedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationChannelTopic)]
public async Task<ISubscribeAck> ServiceCommunicationChannelIssuedEvent(Dictionary<ServiceCommunicationTypeEnum, string> issuedMessage)
{
Logger.LogWarning($"数据通讯通道消息消费订阅解析:{issuedMessage.Serialize()}");
var tempFirstKeyInfo = issuedMessage.FirstOrDefault();
if (tempFirstKeyInfo.Value == null)
{
return SubscribeAck.Fail();
}
ServiceCommunicationTypeEnum serviceCommunication = tempFirstKeyInfo.Key;
bool tempResult = false;
switch (serviceCommunication)
{
case ServiceCommunicationTypeEnum.ArchivalDataIssued:
if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value))
{
FocusCacheInfos focusCacheInfo = tempFirstKeyInfo.Value.Deserialize<FocusCacheInfos>();
tempResult = await SendArchivalDataIssued(focusCacheInfo);
}
break;
default:
throw new Exception("暂不支持该数据通讯通道消息消费订阅解析");
}
return tempResult == true ? SubscribeAck.Success() : SubscribeAck.Fail();
}
protected async Task<bool> SendArchivalDataIssued(FocusCacheInfos focusCacheInfo)
{
try
{
var checkResult = _tcpService.ClientExists(focusCacheInfo.FocusAddress);
if (checkResult)
{
string issuedMessageHexString = "";
await _tcpService.SendAsync(focusCacheInfo.FocusAddress, Convert.FromHexString(issuedMessageHexString));
return true;
}
else
{
return false;
}
}
catch (Exception)
{
throw;
}
}
}
}