using System; using System.Threading.Tasks; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using MassTransit; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; using Volo.Abp.Domain.Repositories; namespace JiShe.CollectBus.Consumers { public class IssuedConsumer: IConsumer { private readonly ILogger _logger; private readonly ITcpService _tcpService; private readonly IRepository _messageReceivedLoginEventRepository; private readonly IRepository _messageReceivedHeartbeatEventRepository; /// /// IssuedConsumer /// /// /// /// /// public IssuedConsumer(ILogger logger, ITcpService tcpService, IRepository messageReceivedLoginEventRepository, IRepository messageReceivedHeartbeatEventRepository) { _logger = logger; _tcpService = tcpService; _messageReceivedLoginEventRepository = messageReceivedLoginEventRepository; _messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository; } public async Task Consume(ConsumeContext context) { switch (context.Message.Type) { case IssuedEventType.Heartbeat: _logger.LogInformation($"IssuedEvent:{context.Message.MessageId}"); var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == context.Message.MessageId); heartbeatEntity.AckTime = DateTime.Now; heartbeatEntity.IsAck = true; await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity); break; case IssuedEventType.Login: var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == context.Message.MessageId); loginEntity.AckTime = DateTime.Now; loginEntity.IsAck = true; await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity); break; case IssuedEventType.Data: break; default: throw new ArgumentOutOfRangeException(); } await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message); } } }