JiShe.CollectBus/services/JiShe.CollectBus.Application/Subscribers/ServiceCommunicationChannelSubscriberService.cs

92 lines
3.4 KiB
C#
Raw Normal View History

2025-06-04 13:40:51 +08:00
using JiShe.CollectBus.Protocol.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using TouchSocket.Sockets;
namespace JiShe.CollectBus.Subscribers
{
public class ServiceCommunicationChannelSubscriberService : CollectBusAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe
{
private readonly ILogger<SubscriberAnalysisAppService> _logger;
private readonly ITcpService _tcpService;
private readonly IServiceProvider _serviceProvider;
private readonly IoTDBSessionPoolProvider _dbProvider;
private readonly IProtocolService _protocolService;
public ServiceCommunicationChannelSubscriberService(ILogger<SubscriberAnalysisAppService> logger,
ITcpService tcpService,
IServiceProvider serviceProvider,
IoTDBSessionPoolProvider dbProvider,
IProtocolService protocolService)
{
_logger = logger;
_tcpService = tcpService;
_serviceProvider = serviceProvider;
_dbProvider = dbProvider;
_protocolService = protocolService;
}
/// <summary>
/// 数据通讯通道消息消费订阅
/// </summary>
/// <param name="issuedMessage"></param>
/// <returns></returns>
[KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationChannelTopic)]
public async Task<ISubscribeAck> ServiceCommunicationChannelIssuedEvent(Dictionary<ServiceCommunicationTypeEnum, string> issuedMessage)
{
Logger.LogWarning($"数据通讯通道消息消费订阅解析:{issuedMessage.Serialize()}");
var tempFirstKeyInfo = issuedMessage.FirstOrDefault();
if (tempFirstKeyInfo.Value == null)
{
return SubscribeAck.Fail();
}
ServiceCommunicationTypeEnum serviceCommunication = tempFirstKeyInfo.Key;
bool tempResult = false;
switch (serviceCommunication)
{
case ServiceCommunicationTypeEnum.ArchivalDataIssued:
if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value))
{
FocusCacheInfos focusCacheInfo = tempFirstKeyInfo.Value.Deserialize<FocusCacheInfos>();
tempResult = await SendArchivalDataIssued(focusCacheInfo);
}
break;
default:
throw new Exception("暂不支持该数据通讯通道消息消费订阅解析");
}
return tempResult == true ? SubscribeAck.Success() : SubscribeAck.Fail();
}
protected async Task<bool> SendArchivalDataIssued(FocusCacheInfos focusCacheInfo)
{
try
{
var checkResult = _tcpService.ClientExists(focusCacheInfo.FocusAddress);
if (checkResult)
{
string issuedMessageHexString = "";
await _tcpService.SendAsync(focusCacheInfo.FocusAddress, Convert.FromHexString(issuedMessageHexString));
return true;
}
else
{
return false;
}
}
catch (Exception)
{
throw;
}
}
}
}