65 lines
2.9 KiB
C#
65 lines
2.9 KiB
C#
|
|
using System;
|
|||
|
|
using System.Threading.Tasks;
|
|||
|
|
using JiShe.CollectBus.Common.Enums;
|
|||
|
|
using JiShe.CollectBus.MessageIssueds;
|
|||
|
|
using JiShe.CollectBus.MessageReceiveds;
|
|||
|
|
using MassTransit;
|
|||
|
|
using Microsoft.Extensions.Logging;
|
|||
|
|
using TouchSocket.Sockets;
|
|||
|
|
using Volo.Abp.Domain.Repositories;
|
|||
|
|
|
|||
|
|
namespace JiShe.CollectBus.Consumers
|
|||
|
|
{
|
|||
|
|
public class IssuedConsumer: IConsumer<MessageIssued>
|
|||
|
|
{
|
|||
|
|
private readonly ILogger<ReceivedHeartbeatConsumer> _logger;
|
|||
|
|
private readonly ITcpService _tcpService;
|
|||
|
|
private readonly IRepository<MessageReceivedLogin, Guid> _messageReceivedLoginEventRepository;
|
|||
|
|
private readonly IRepository<MessageReceivedHeartbeat, Guid> _messageReceivedHeartbeatEventRepository;
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// IssuedConsumer
|
|||
|
|
/// </summary>
|
|||
|
|
/// <param name="logger"></param>
|
|||
|
|
/// <param name="tcpService"></param>
|
|||
|
|
/// <param name="messageReceivedLoginEventRepository"></param>
|
|||
|
|
/// <param name="messageReceivedHeartbeatEventRepository"></param>
|
|||
|
|
public IssuedConsumer(ILogger<ReceivedHeartbeatConsumer> logger,
|
|||
|
|
ITcpService tcpService,
|
|||
|
|
IRepository<MessageReceivedLogin, Guid> messageReceivedLoginEventRepository,
|
|||
|
|
IRepository<MessageReceivedHeartbeat, Guid> messageReceivedHeartbeatEventRepository)
|
|||
|
|
{
|
|||
|
|
_logger = logger;
|
|||
|
|
_tcpService = tcpService;
|
|||
|
|
_messageReceivedLoginEventRepository = messageReceivedLoginEventRepository;
|
|||
|
|
_messageReceivedHeartbeatEventRepository = messageReceivedHeartbeatEventRepository;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
public async Task Consume(ConsumeContext<MessageIssued> context)
|
|||
|
|
{
|
|||
|
|
switch (context.Message.Type)
|
|||
|
|
{
|
|||
|
|
case IssuedEventType.Heartbeat:
|
|||
|
|
_logger.LogInformation($"IssuedEvent:{context.Message.MessageId}");
|
|||
|
|
var heartbeatEntity = await _messageReceivedHeartbeatEventRepository.GetAsync(a => a.MessageId == context.Message.MessageId);
|
|||
|
|
heartbeatEntity.AckTime = DateTime.Now;
|
|||
|
|
heartbeatEntity.IsAck = true;
|
|||
|
|
await _messageReceivedHeartbeatEventRepository.UpdateAsync(heartbeatEntity);
|
|||
|
|
break;
|
|||
|
|
case IssuedEventType.Login:
|
|||
|
|
var loginEntity = await _messageReceivedLoginEventRepository.GetAsync(a => a.MessageId == context.Message.MessageId);
|
|||
|
|
loginEntity.AckTime = DateTime.Now;
|
|||
|
|
loginEntity.IsAck = true;
|
|||
|
|
await _messageReceivedLoginEventRepository.UpdateAsync(loginEntity);
|
|||
|
|
break;
|
|||
|
|
case IssuedEventType.Data:
|
|||
|
|
break;
|
|||
|
|
default:
|
|||
|
|
throw new ArgumentOutOfRangeException();
|
|||
|
|
}
|
|||
|
|
await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|