This commit is contained in:
Dai Mr 2024-11-08 14:53:41 +08:00
commit 7642a1c6bb
6 changed files with 94 additions and 43 deletions

View File

@ -6,6 +6,7 @@ using TouchSocket.Sockets;
using JiShe.CollectBus.RabbitMQ.Senders;
using Microsoft.Extensions.Logging;
using JiShe.CollectBus.MongoDB;
using MassTransit;
namespace JiShe.CollectBus.Core.Plugins
{
@ -51,7 +52,7 @@ namespace JiShe.CollectBus.Core.Plugins
MessageHexString = messageHexString,
ReceivedTime = DateTime.Now,
DeviceNo = aTuple.Item1,
MessageId = Guid.NewGuid().ToString()
MessageId = NewId.NextGuid().ToString()
};
await _mongoLoginRepository.AddAsync(messageReceivedLoginEvent);
await _nSender.SendToReceivedLoginAsync(messageReceivedLoginEvent);
@ -65,7 +66,7 @@ namespace JiShe.CollectBus.Core.Plugins
MessageHexString = messageHexString,
ReceivedTime = DateTime.Now,
DeviceNo = aTuple.Item1,
MessageId = Guid.NewGuid().ToString()
MessageId = NewId.NextGuid().ToString()
};
await _mongoLoginRepository.AddAsync(messageReceivedExitLoginEvent);
await _nSender.SendToReceivedLoginAsync(messageReceivedExitLoginEvent);
@ -79,7 +80,7 @@ namespace JiShe.CollectBus.Core.Plugins
MessageHexString = messageHexString,
ReceivedTime = DateTime.Now,
DeviceNo = aTuple.Item1,
MessageId = Guid.NewGuid().ToString()
MessageId = NewId.NextGuid().ToString()
};
await _mongoHeartbeatRepository.AddAsync(messageReceivedHeartbeatEvent);
await _nSender.SendToReceivedHeartbeatAsync(messageReceivedHeartbeatEvent);
@ -95,7 +96,7 @@ namespace JiShe.CollectBus.Core.Plugins
ClientPort = client.Port,
MessageHexString = messageHexString,
DeviceNo = aTuple.Item1,
MessageId = Guid.NewGuid().ToString()
MessageId = NewId.NextGuid().ToString()
});
}
}

View File

@ -1,5 +1,4 @@
using System.Security.Cryptography.X509Certificates;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.Common.Enums;
using JiShe.CollectBus.MongoDB;
using JiShe.CollectBus.Protocol.Contracts.Models;
using MassTransit;
@ -8,38 +7,29 @@ using TouchSocket.Sockets;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
public class MessageIssuedConsumer : IConsumer<MessageIssuedEvent>
public class MessageIssuedConsumer(
ILogger<MessageIssuedEvent> logger,
ITcpService tcpService,
IMongoRepository<MessageReceivedHeartbeatEvent> mongoHeartbeatRepository,
IMongoRepository<MessageReceivedLoginEvent> mongoLoginRepository)
: IConsumer<MessageIssuedEvent>
{
private readonly ILogger<MessageIssuedEvent> _logger;
private readonly ITcpService _tcpService;
public readonly IMongoRepository<MessageReceivedHeartbeatEvent> _mongoHeartbeatRepository;
public readonly IMongoRepository<MessageReceivedLoginEvent> _mongoLoginRepository;
public MessageIssuedConsumer(ILogger<MessageIssuedEvent> logger, ITcpService tcpService, IMongoRepository<MessageReceivedHeartbeatEvent> mongoHeartbeatRepository, IMongoRepository<MessageReceivedLoginEvent> mongoLoginRepository)
{
_logger = logger;
_tcpService = tcpService;
_mongoHeartbeatRepository = mongoHeartbeatRepository;
_mongoLoginRepository = mongoLoginRepository;
}
public async Task Consume(ConsumeContext<MessageIssuedEvent> context)
{
switch (context.Message.Type)
{
case IssuedEventType.Heartbeat:
await _mongoHeartbeatRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b =>new MessageReceivedHeartbeatEvent { IsAck = true,AckTime=DateTime.Now});
await mongoHeartbeatRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b => new MessageReceivedHeartbeatEvent { IsAck = true, AckTime = DateTime.Now });
break;
case IssuedEventType.Login:
await _mongoLoginRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b => new MessageReceivedLoginEvent { IsAck = true, AckTime = DateTime.Now });
await mongoLoginRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b => new MessageReceivedLoginEvent { IsAck = true, AckTime = DateTime.Now });
break;
case IssuedEventType.Data:
break;
default:
throw new ArgumentOutOfRangeException();
}
await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message);
await tcpService.SendAsync(context.Message.ClientId, context.Message.Message);
}
}
}

View File

@ -0,0 +1,18 @@
using JiShe.CollectBus.Protocol.Contracts.Models;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
public class MessageIssuedFaultConsumer : IConsumer<Fault<MessageIssuedEvent>>
{
public Task Consume(ConsumeContext<Fault<MessageIssuedEvent>> context)
{
throw new NotImplementedException();
}
}
}

View File

@ -2,36 +2,37 @@
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Net.Sockets;
using JiShe.CollectBus.Protocol.Contracts.Models;
using TouchSocket.Sockets;
using JiShe.CollectBus.MongoDB;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
public class MessageReceivedConsumer : IConsumer<MessageReceivedEvent>
/// <summary>
/// Batch 一次最多 100 个,最多 10 个并发批次
/// </summary>
public class MessageReceivedConsumer(
ILogger<MessageReceivedConsumer> logger,
IServiceProvider serviceProvider,
IMongoRepository<MessageReceivedEvent> mongoReceivedRepository)
: IConsumer<Batch<MessageReceivedEvent>>
{
private readonly ILogger<MessageReceivedConsumer> _logger;
private readonly IServiceProvider _serviceProvider;
public MessageReceivedConsumer(ILogger<MessageReceivedConsumer> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}
public async Task Consume(ConsumeContext<MessageReceivedEvent> context)
public async Task Consume(ConsumeContext<Batch<MessageReceivedEvent>> context)
{
const string protocolType = "StandardProtocol";
var protocolPlugin = _serviceProvider.GetKeyedService<IProtocolPlugin>(protocolType);
var protocolPlugin = serviceProvider.GetKeyedService<IProtocolPlugin>(protocolType);
if (protocolPlugin == null)
{
_logger.LogError("协议不存在!");
logger.LogError("协议不存在!");
}
else
{
await protocolPlugin.AnalyzeAsync(context.Message);
var list = new List<MessageReceivedEvent>();
foreach (var contextItem in context.Message)
{
await protocolPlugin.AnalyzeAsync(contextItem.Message);
list.Add(contextItem.Message);
}
await mongoReceivedRepository.InsertManyAsync(list);
}
}
}

View File

@ -0,0 +1,18 @@
using JiShe.CollectBus.Protocol.Contracts.Models;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace JiShe.CollectBus.RabbitMQ.Consumers
{
public class MessageReceivedFaultConsumer: IConsumer<Fault<Batch<MessageReceivedEvent>>>
{
public Task Consume(ConsumeContext<Fault<Batch<MessageReceivedEvent>>> context)
{
throw new NotImplementedException();
}
}
}

View File

@ -3,6 +3,7 @@ using JiShe.CollectBus.RabbitMQ.Consumers;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Data;
namespace JiShe.CollectBus.RabbitMQ
{
@ -13,10 +14,32 @@ namespace JiShe.CollectBus.RabbitMQ
var configuration = hostContext.Configuration;
services.AddMassTransit(x =>
{
x.AddConsumer<MessageReceivedConsumer>();
x.AddConsumer<MessageReceivedConsumer>(cfg =>
{
cfg.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(s: 1)
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
//.GroupBy<MyMessage, string>(x => x.CustomerId)
.SetConcurrencyLimit(10));
});
x.AddConsumer<MessageIssuedConsumer>();
x.AddConsumer<MessageReceivedLoginConsumer>();
x.AddConsumer<MessageReceivedHeartbeatConsumer>();
x.AddConfigureEndpointsCallback((context, name, cfg) =>
{
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox(context);
//cfg.UseMessageRetry(r =>
//{
// r.Immediate(5);
// r.Handle<DataException>(x => x.Message.Contains("SQL"));
//});
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>