From 1b4c4dd8ff43e1379feacb7330caf7a7aa556ea3 Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Mon, 7 Apr 2025 17:35:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CollectBusApplicationModule.cs | 9 ++- .../CollectBusDomainModule.cs | 2 +- .../CollectBusHostModule.Configure.cs | 64 +++++++++++++------ 3 files changed, 53 insertions(+), 22 deletions(-) diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 085ab8e..4230150 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -16,6 +16,13 @@ using Volo.Abp.BackgroundWorkers.Hangfire; using JiShe.CollectBus.MongoDB; using JiShe.CollectBus.ScheduledMeterReading; using JiShe.CollectBus.IoTDBProvider; +using Confluent.Kafka.Admin; +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using JiShe.CollectBus.Protocol.Contracts; +using System.Collections.Generic; +using Thrift; +using Microsoft.Extensions.Configuration; namespace JiShe.CollectBus; @@ -60,7 +67,7 @@ public class CollectBusApplicationModule : AbpModule var dbContext = context.ServiceProvider.GetRequiredService(); //默认初始化表计信息 - dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); + dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); } } diff --git a/src/JiShe.CollectBus.Domain/CollectBusDomainModule.cs b/src/JiShe.CollectBus.Domain/CollectBusDomainModule.cs index 64519c7..24945de 100644 --- a/src/JiShe.CollectBus.Domain/CollectBusDomainModule.cs +++ b/src/JiShe.CollectBus.Domain/CollectBusDomainModule.cs @@ -12,7 +12,7 @@ namespace JiShe.CollectBus; [DependsOn( typeof(CollectBusDomainSharedModule), typeof(AbpAuditLoggingDomainModule), - typeof(AbpCachingModule), + typeof(AbpCachingModule), typeof(AbpBackgroundJobsDomainModule) )] public class CollectBusDomainModule : AbpModule diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 40f498e..80145ab 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -23,6 +23,7 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageIssueds; using Confluent.Kafka; using MassTransit.SqlTransport.Topology; +using Confluent.Kafka.Admin; namespace JiShe.CollectBus.Host @@ -286,29 +287,51 @@ namespace JiShe.CollectBus.Host /// The configuration. public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration) { + + + var adminClient = new AdminClientBuilder(new AdminClientConfig + { + BootstrapServers = configuration.GetConnectionString("Kafka") + }).Build(); + + try + { + var topics = ProtocolConstExtensions.GetAllTopicNames(); + List topicSpecifications = new List(); + foreach (var item in topics) + { + topicSpecifications.Add(new TopicSpecification + { + Name = item, + NumPartitions = 3, + ReplicationFactor = 1 + }); + } + adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult(); + } + catch (CreateTopicsException e) + { + if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) + { + throw; + } + } + + var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup }; var producerConfig = new ProducerConfig(); - context.Services -// .ConfigureKafkaTestOptions(options => -// { - -//#if DEBUG -// options.CleanTopicsOnStart = true;// 测试时,每次启动都删除topic,生产环境不需要 -//#endif -// options.CreateTopicsIfNotExists = true; -// options.TopicNames = ProtocolConstExtensions.GetAllTopicNames(); -// }) + context.Services .AddMassTransit(x => { x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context)); - //x.AddConfigureEndpointsCallback((c, name, cfg) => - //{ - // cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); - // cfg.UseMessageRetry(r => r.Immediate(5)); - // cfg.UseInMemoryOutbox(c); - //}); + x.AddConfigureEndpointsCallback((c, name, cfg) => + { + cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); + cfg.UseMessageRetry(r => r.Immediate(5)); + cfg.UseInMemoryOutbox(c); + }); x.AddRider(rider => { @@ -322,14 +345,15 @@ namespace JiShe.CollectBus.Host .SetTimeLimit(s: 1) .SetTimeLimitStart(BatchTimeLimitStart.FromLast) .SetConcurrencyLimit(10)); - }); - + }); rider.AddConsumer(); + rider.AddProducer(ProtocolConst.SubscriberReceivedLoginEventName); + rider.AddProducer(ProtocolConst.SubscriberReceivedHeartbeatEventName); + rider.UsingKafka((c, cfg) => { - List hosts = new List() { "121.42.242.91:29092", "121.42.242.91:39092", "121.42.242.91:49092" }; - cfg.Host(hosts); + cfg.Host(configuration.GetConnectionString("Kafka")); cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedHeartbeatEventName, consumerConfig, configurator => {