using System.Security.Cryptography.X509Certificates; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.MongoDB; using JiShe.CollectBus.Protocol.Contracts.Models; using MassTransit; using Microsoft.Extensions.Logging; using TouchSocket.Sockets; namespace JiShe.CollectBus.RabbitMQ.Consumers { public class MessageIssuedConsumer : IConsumer { private readonly ILogger _logger; private readonly ITcpService _tcpService; public readonly IMongoRepository _mongoHeartbeatRepository; public readonly IMongoRepository _mongoLoginRepository; public MessageIssuedConsumer(ILogger logger, ITcpService tcpService, IMongoRepository mongoHeartbeatRepository, IMongoRepository mongoLoginRepository) { _logger = logger; _tcpService = tcpService; _mongoHeartbeatRepository = mongoHeartbeatRepository; _mongoLoginRepository = mongoLoginRepository; } public async Task Consume(ConsumeContext context) { switch (context.Message.Type) { case IssuedEventType.Heartbeat: await _mongoHeartbeatRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b =>new MessageReceivedHeartbeatEvent { IsAck = true,AckTime=DateTime.Now}); break; case IssuedEventType.Login: await _mongoLoginRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b => new MessageReceivedLoginEvent { IsAck = true, AckTime = DateTime.Now }); break; case IssuedEventType.Data: break; default: throw new ArgumentOutOfRangeException(); } await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message); } } }