diff --git a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs index 085ab8e..4230150 100644 --- a/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs +++ b/src/JiShe.CollectBus.Application/CollectBusApplicationModule.cs @@ -16,6 +16,13 @@ using Volo.Abp.BackgroundWorkers.Hangfire; using JiShe.CollectBus.MongoDB; using JiShe.CollectBus.ScheduledMeterReading; using JiShe.CollectBus.IoTDBProvider; +using Confluent.Kafka.Admin; +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using JiShe.CollectBus.Protocol.Contracts; +using System.Collections.Generic; +using Thrift; +using Microsoft.Extensions.Configuration; namespace JiShe.CollectBus; @@ -60,7 +67,7 @@ public class CollectBusApplicationModule : AbpModule var dbContext = context.ServiceProvider.GetRequiredService(); //默认初始化表计信息 - dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); + dbContext.InitAmmeterCacheData().ConfigureAwait(false).GetAwaiter().GetResult(); } } diff --git a/src/JiShe.CollectBus.Domain/CollectBusDomainModule.cs b/src/JiShe.CollectBus.Domain/CollectBusDomainModule.cs index 64519c7..24945de 100644 --- a/src/JiShe.CollectBus.Domain/CollectBusDomainModule.cs +++ b/src/JiShe.CollectBus.Domain/CollectBusDomainModule.cs @@ -12,7 +12,7 @@ namespace JiShe.CollectBus; [DependsOn( typeof(CollectBusDomainSharedModule), typeof(AbpAuditLoggingDomainModule), - typeof(AbpCachingModule), + typeof(AbpCachingModule), typeof(AbpBackgroundJobsDomainModule) )] public class CollectBusDomainModule : AbpModule diff --git a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs index 40f498e..80145ab 100644 --- a/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs +++ b/src/JiShe.CollectBus.Host/CollectBusHostModule.Configure.cs @@ -23,6 +23,7 @@ using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MessageIssueds; using Confluent.Kafka; using MassTransit.SqlTransport.Topology; +using Confluent.Kafka.Admin; namespace JiShe.CollectBus.Host @@ -286,29 +287,51 @@ namespace JiShe.CollectBus.Host /// The configuration. public void ConfigureMassTransit(ServiceConfigurationContext context, IConfiguration configuration) { + + + var adminClient = new AdminClientBuilder(new AdminClientConfig + { + BootstrapServers = configuration.GetConnectionString("Kafka") + }).Build(); + + try + { + var topics = ProtocolConstExtensions.GetAllTopicNames(); + List topicSpecifications = new List(); + foreach (var item in topics) + { + topicSpecifications.Add(new TopicSpecification + { + Name = item, + NumPartitions = 3, + ReplicationFactor = 1 + }); + } + adminClient.CreateTopicsAsync(topicSpecifications).ConfigureAwait(false).GetAwaiter().GetResult(); + } + catch (CreateTopicsException e) + { + if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists) + { + throw; + } + } + + var consumerConfig = new ConsumerConfig { GroupId = ProtocolConst.SubscriberGroup }; var producerConfig = new ProducerConfig(); - context.Services -// .ConfigureKafkaTestOptions(options => -// { - -//#if DEBUG -// options.CleanTopicsOnStart = true;// 测试时,每次启动都删除topic,生产环境不需要 -//#endif -// options.CreateTopicsIfNotExists = true; -// options.TopicNames = ProtocolConstExtensions.GetAllTopicNames(); -// }) + context.Services .AddMassTransit(x => { x.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context)); - //x.AddConfigureEndpointsCallback((c, name, cfg) => - //{ - // cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); - // cfg.UseMessageRetry(r => r.Immediate(5)); - // cfg.UseInMemoryOutbox(c); - //}); + x.AddConfigureEndpointsCallback((c, name, cfg) => + { + cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); + cfg.UseMessageRetry(r => r.Immediate(5)); + cfg.UseInMemoryOutbox(c); + }); x.AddRider(rider => { @@ -322,14 +345,15 @@ namespace JiShe.CollectBus.Host .SetTimeLimit(s: 1) .SetTimeLimitStart(BatchTimeLimitStart.FromLast) .SetConcurrencyLimit(10)); - }); - + }); rider.AddConsumer(); + rider.AddProducer(ProtocolConst.SubscriberReceivedLoginEventName); + rider.AddProducer(ProtocolConst.SubscriberReceivedHeartbeatEventName); + rider.UsingKafka((c, cfg) => { - List hosts = new List() { "121.42.242.91:29092", "121.42.242.91:39092", "121.42.242.91:49092" }; - cfg.Host(hosts); + cfg.Host(configuration.GetConnectionString("Kafka")); cfg.TopicEndpoint(ProtocolConst.SubscriberReceivedHeartbeatEventName, consumerConfig, configurator => {