diff --git a/JiShe.CollectBus.Core/Plugins/TcpServicePlugin.cs b/JiShe.CollectBus.Core/Plugins/TcpServicePlugin.cs index 0b0f1dc..54cddc5 100644 --- a/JiShe.CollectBus.Core/Plugins/TcpServicePlugin.cs +++ b/JiShe.CollectBus.Core/Plugins/TcpServicePlugin.cs @@ -6,6 +6,7 @@ using TouchSocket.Sockets; using JiShe.CollectBus.RabbitMQ.Senders; using Microsoft.Extensions.Logging; using JiShe.CollectBus.MongoDB; +using MassTransit; namespace JiShe.CollectBus.Core.Plugins { @@ -51,7 +52,7 @@ namespace JiShe.CollectBus.Core.Plugins MessageHexString = messageHexString, ReceivedTime = DateTime.Now, DeviceNo = aTuple.Item1, - MessageId = Guid.NewGuid().ToString() + MessageId = NewId.NextGuid().ToString() }; await _mongoLoginRepository.AddAsync(messageReceivedLoginEvent); await _nSender.SendToReceivedLoginAsync(messageReceivedLoginEvent); @@ -65,7 +66,7 @@ namespace JiShe.CollectBus.Core.Plugins MessageHexString = messageHexString, ReceivedTime = DateTime.Now, DeviceNo = aTuple.Item1, - MessageId = Guid.NewGuid().ToString() + MessageId = NewId.NextGuid().ToString() }; await _mongoLoginRepository.AddAsync(messageReceivedExitLoginEvent); await _nSender.SendToReceivedLoginAsync(messageReceivedExitLoginEvent); @@ -79,7 +80,7 @@ namespace JiShe.CollectBus.Core.Plugins MessageHexString = messageHexString, ReceivedTime = DateTime.Now, DeviceNo = aTuple.Item1, - MessageId = Guid.NewGuid().ToString() + MessageId = NewId.NextGuid().ToString() }; await _mongoHeartbeatRepository.AddAsync(messageReceivedHeartbeatEvent); await _nSender.SendToReceivedHeartbeatAsync(messageReceivedHeartbeatEvent); @@ -95,7 +96,7 @@ namespace JiShe.CollectBus.Core.Plugins ClientPort = client.Port, MessageHexString = messageHexString, DeviceNo = aTuple.Item1, - MessageId = Guid.NewGuid().ToString() + MessageId = NewId.NextGuid().ToString() }); } } diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs index b518b67..e66c3ad 100644 --- a/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs +++ b/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedConsumer.cs @@ -1,5 +1,4 @@ -using System.Security.Cryptography.X509Certificates; -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.MongoDB; using JiShe.CollectBus.Protocol.Contracts.Models; using MassTransit; @@ -8,38 +7,29 @@ using TouchSocket.Sockets; namespace JiShe.CollectBus.RabbitMQ.Consumers { - public class MessageIssuedConsumer : IConsumer + public class MessageIssuedConsumer( + ILogger logger, + ITcpService tcpService, + IMongoRepository mongoHeartbeatRepository, + IMongoRepository mongoLoginRepository) + : IConsumer { - private readonly ILogger _logger; - private readonly ITcpService _tcpService; - public readonly IMongoRepository _mongoHeartbeatRepository; - public readonly IMongoRepository _mongoLoginRepository; - - - public MessageIssuedConsumer(ILogger logger, ITcpService tcpService, IMongoRepository mongoHeartbeatRepository, IMongoRepository mongoLoginRepository) - { - _logger = logger; - _tcpService = tcpService; - _mongoHeartbeatRepository = mongoHeartbeatRepository; - _mongoLoginRepository = mongoLoginRepository; - } - public async Task Consume(ConsumeContext context) { switch (context.Message.Type) { case IssuedEventType.Heartbeat: - await _mongoHeartbeatRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b =>new MessageReceivedHeartbeatEvent { IsAck = true,AckTime=DateTime.Now}); + await mongoHeartbeatRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b => new MessageReceivedHeartbeatEvent { IsAck = true, AckTime = DateTime.Now }); break; case IssuedEventType.Login: - await _mongoLoginRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b => new MessageReceivedLoginEvent { IsAck = true, AckTime = DateTime.Now }); + await mongoLoginRepository.UpdateAsync(a => a.MessageId == context.Message.MessageId, b => new MessageReceivedLoginEvent { IsAck = true, AckTime = DateTime.Now }); break; case IssuedEventType.Data: break; default: throw new ArgumentOutOfRangeException(); } - await _tcpService.SendAsync(context.Message.ClientId, context.Message.Message); + await tcpService.SendAsync(context.Message.ClientId, context.Message.Message); } } } diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedFaultConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedFaultConsumer.cs new file mode 100644 index 0000000..8b4eed9 --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Consumers/MessageIssuedFaultConsumer.cs @@ -0,0 +1,18 @@ +using JiShe.CollectBus.Protocol.Contracts.Models; +using MassTransit; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.RabbitMQ.Consumers +{ + public class MessageIssuedFaultConsumer : IConsumer> + { + public Task Consume(ConsumeContext> context) + { + throw new NotImplementedException(); + } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedConsumer.cs index 85733a7..a4f8dbe 100644 --- a/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedConsumer.cs +++ b/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedConsumer.cs @@ -2,36 +2,37 @@ using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using System; -using System.Net.Sockets; using JiShe.CollectBus.Protocol.Contracts.Models; -using TouchSocket.Sockets; +using JiShe.CollectBus.MongoDB; namespace JiShe.CollectBus.RabbitMQ.Consumers { - public class MessageReceivedConsumer : IConsumer + /// + /// Batch 一次最多 100 个,最多 10 个并发批次 + /// + public class MessageReceivedConsumer( + ILogger logger, + IServiceProvider serviceProvider, + IMongoRepository mongoReceivedRepository) + : IConsumer> { - - private readonly ILogger _logger; - private readonly IServiceProvider _serviceProvider; - - public MessageReceivedConsumer(ILogger logger, IServiceProvider serviceProvider) - { - _logger = logger; - _serviceProvider = serviceProvider; - } - - public async Task Consume(ConsumeContext context) + public async Task Consume(ConsumeContext> context) { const string protocolType = "StandardProtocol"; - var protocolPlugin = _serviceProvider.GetKeyedService(protocolType); + var protocolPlugin = serviceProvider.GetKeyedService(protocolType); if (protocolPlugin == null) { - _logger.LogError("协议不存在!"); + logger.LogError("协议不存在!"); } else { - await protocolPlugin.AnalyzeAsync(context.Message); + var list = new List(); + foreach (var contextItem in context.Message) + { + await protocolPlugin.AnalyzeAsync(contextItem.Message); + list.Add(contextItem.Message); + } + await mongoReceivedRepository.InsertManyAsync(list); } } } diff --git a/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedFaultConsumer.cs b/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedFaultConsumer.cs new file mode 100644 index 0000000..0352817 --- /dev/null +++ b/JiShe.CollectBus.RabbitMQ/Consumers/MessageReceivedFaultConsumer.cs @@ -0,0 +1,18 @@ +using JiShe.CollectBus.Protocol.Contracts.Models; +using MassTransit; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.RabbitMQ.Consumers +{ + public class MessageReceivedFaultConsumer: IConsumer>> + { + public Task Consume(ConsumeContext>> context) + { + throw new NotImplementedException(); + } + } +} diff --git a/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs b/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs index 86c9d33..a98e9c9 100644 --- a/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs +++ b/JiShe.CollectBus.RabbitMQ/JiSheCollectBusRabbitMqModule.cs @@ -3,6 +3,7 @@ using JiShe.CollectBus.RabbitMQ.Consumers; using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using System.Data; namespace JiShe.CollectBus.RabbitMQ { @@ -13,10 +14,32 @@ namespace JiShe.CollectBus.RabbitMQ var configuration = hostContext.Configuration; services.AddMassTransit(x => { - x.AddConsumer(); + x.AddConsumer(cfg => + { + cfg.Options(options => options + .SetMessageLimit(100) + .SetTimeLimit(s: 1) + .SetTimeLimitStart(BatchTimeLimitStart.FromLast) + //.GroupBy(x => x.CustomerId) + .SetConcurrencyLimit(10)); + }); x.AddConsumer(); x.AddConsumer(); x.AddConsumer(); + + x.AddConfigureEndpointsCallback((context, name, cfg) => + { + cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); + cfg.UseMessageRetry(r => r.Immediate(5)); + cfg.UseInMemoryOutbox(context); + + //cfg.UseMessageRetry(r => + //{ + // r.Immediate(5); + // r.Handle(x => x.Message.Contains("SQL")); + //}); + }); + x.UsingRabbitMq((context, cfg) => { cfg.Host(configuration["MQ:Host"], ushort.Parse(configuration["MQ:Port"] ?? string.Empty), configuration["MQ:VirtualHost"], h =>