优化代码

This commit is contained in:
ChenYi 2025-04-07 17:35:37 +08:00
parent 7696b7379b
commit 1b4c4dd8ff
3 changed files with 53 additions and 22 deletions

View File

@ -16,6 +16,13 @@ using Volo.Abp.BackgroundWorkers.Hangfire;
using JiShe.CollectBus.MongoDB; using JiShe.CollectBus.MongoDB;
using JiShe.CollectBus.ScheduledMeterReading; using JiShe.CollectBus.ScheduledMeterReading;
using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IoTDBProvider;
using Confluent.Kafka.Admin;
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using JiShe.CollectBus.Protocol.Contracts;
using System.Collections.Generic;
using Thrift;
using Microsoft.Extensions.Configuration;
namespace JiShe.CollectBus; namespace JiShe.CollectBus;

View File

@ -23,6 +23,7 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds;
using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageIssueds;
using Confluent.Kafka; using Confluent.Kafka;
using MassTransit.SqlTransport.Topology; using MassTransit.SqlTransport.Topology;
using Confluent.Kafka.Admin;
namespace JiShe.CollectBus.Host namespace JiShe.CollectBus.Host
@ -286,29 +287,51 @@ namespace JiShe.CollectBus.Host
/// <param name="configuration">The configuration.</param> /// <param name="configuration">The configuration.</param>
public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration) public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration)
{ {
var adminClient = new AdminClientBuilder(new AdminClientConfig
{
BootstrapServers = configuration.GetConnectionString("Kafka")
}).Build();
try
{
var topics = ProtocolConstExtensions.GetAllTopicNames();
List<TopicSpecification> topicSpecifications = new List<TopicSpecification>();
foreach (var item in topics)
{
topicSpecifications.Add(new TopicSpecification
{
Name = item,
NumPartitions = 3,
ReplicationFactor = 1
});
}
adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
throw;
}
}
var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup }; var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup };
var producerConfig = new ProducerConfig(); var producerConfig = new ProducerConfig();
context.Services context.Services
// .ConfigureKafkaTestOptions(options =>
// {
//#if DEBUG
// options.CleanTopicsOnStart = true;// 测试时每次启动都删除topic,生产环境不需要
//#endif
// options.CreateTopicsIfNotExists = true;
// options.TopicNames = ProtocolConstExtensions.GetAllTopicNames();
// })
.AddMassTransit(x => .AddMassTransit(x =>
{ {
x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context)); x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context));
//x.AddConfigureEndpointsCallback((c, name, cfg) => x.AddConfigureEndpointsCallback((c, name, cfg) =>
//{ {
// cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
// cfg.UseMessageRetry(r => r.Immediate(5)); cfg.UseMessageRetry(r => r.Immediate(5));
// cfg.UseInMemoryOutbox(c); cfg.UseInMemoryOutbox(c);
//}); });
x.AddRider(rider => x.AddRider(rider =>
{ {
@ -323,13 +346,14 @@ namespace JiShe.CollectBus.Host
.SetTimeLimitStart(BatchTimeLimitStart.FromLast) .SetTimeLimitStart(BatchTimeLimitStart.FromLast)
.SetConcurrencyLimit(10)); .SetConcurrencyLimit(10));
}); });
rider.AddConsumer<ScheduledMeterReadingConsumer>(); rider.AddConsumer<ScheduledMeterReadingConsumer>();
rider.AddProducer<string, MessageReceivedLogin>(ProtocolConst.SubscriberReceivedLoginEventName);
rider.AddProducer<string, ReceivedHeartbeatConsumer>(ProtocolConst.SubscriberReceivedHeartbeatEventName);
rider.UsingKafka((c, cfg) => rider.UsingKafka((c, cfg) =>
{ {
List<string> hosts = new List<string>() { "121.42.242.91:29092", "121.42.242.91:39092", "121.42.242.91:49092" }; cfg.Host(configuration.GetConnectionString("Kafka"));
cfg.Host(hosts);
cfg.TopicEndpoint<MessageReceivedHeartbeat>(ProtocolConst.SubscriberReceivedHeartbeatEventName, consumerConfig, configurator => cfg.TopicEndpoint<MessageReceivedHeartbeat>(ProtocolConst.SubscriberReceivedHeartbeatEventName, consumerConfig, configurator =>
{ {