using JiShe.CollectBus.Protocol.Interfaces; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using TouchSocket.Sockets; namespace JiShe.CollectBus.Subscribers { public class ServiceCommunicationChannelSubscriberService : CollectBusAppService, IServiceCommunicationChannelSubscriberService, IKafkaSubscribe { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IServiceProvider _serviceProvider; private readonly IoTDBSessionPoolProvider _dbProvider; private readonly IProtocolService _protocolService; public ServiceCommunicationChannelSubscriberService(ILogger logger, ITcpService tcpService, IServiceProvider serviceProvider, IoTDBSessionPoolProvider dbProvider, IProtocolService protocolService) { _logger = logger; _tcpService = tcpService; _serviceProvider = serviceProvider; _dbProvider = dbProvider; _protocolService = protocolService; } /// /// 数据通讯通道消息消费订阅 /// /// /// [KafkaSubscribe(KafkaTopicConsts.ServiceCommunicationChannelTopic)] public async Task ServiceCommunicationChannelIssuedEvent(Dictionary issuedMessage) { Logger.LogWarning($"数据通讯通道消息消费订阅解析:{issuedMessage.Serialize()}"); var tempFirstKeyInfo = issuedMessage.FirstOrDefault(); if (tempFirstKeyInfo.Value == null) { return SubscribeAck.Fail(); } ServiceCommunicationTypeEnum serviceCommunication = tempFirstKeyInfo.Key; bool tempResult = false; switch (serviceCommunication) { case ServiceCommunicationTypeEnum.ArchivalDataIssued: if(!string.IsNullOrWhiteSpace(tempFirstKeyInfo.Value)) { FocusCacheInfos focusCacheInfo = tempFirstKeyInfo.Value.Deserialize(); tempResult = await SendArchivalDataIssued(focusCacheInfo); } break; default: throw new Exception("暂不支持该数据通讯通道消息消费订阅解析"); } return tempResult == true ? SubscribeAck.Success() : SubscribeAck.Fail(); } protected async Task SendArchivalDataIssued(FocusCacheInfos focusCacheInfo) { try { var checkResult = _tcpService.ClientExists(focusCacheInfo.FocusAddress); if (checkResult) { string issuedMessageHexString = ""; await _tcpService.SendAsync(focusCacheInfo.FocusAddress, Convert.FromHexString(issuedMessageHexString)); return true; } else { return false; } } catch (Exception) { throw; } } } }