From 80355a409ab2b2a9f783598a153b04f19aacf394 Mon Sep 17 00:00:00 2001 From: cli <377476583@qq.com> Date: Wed, 2 Apr 2025 17:54:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- JiShe.CollectBus.sln | 7 +++ .../Workers/CreateToBeIssueTaskWorker.cs | 2 - .../Workers/SubscriberFifteenMinuteWorker.cs | 2 - .../Workers/SubscriberFiveMinuteWorker.cs | 3 - .../Workers/SubscriberOneMinuteWorker.cs | 2 - .../JiShe.CollectBus.DbMigrator.csproj | 6 +- src/JiShe.CollectBus.Host/appsettings.json | 10 ++-- .../CollectBusKafkaModule.cs | 11 ++++ .../ConsumerService.cs | 58 +++++++++++++++++++ .../JiShe.CollectBus.Kafka.csproj | 14 +++++ .../ProducerService.cs | 28 +++++++++ 11 files changed, 126 insertions(+), 17 deletions(-) create mode 100644 src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs create mode 100644 src/JiShe.CollectBus.KafkaProducer/ConsumerService.cs create mode 100644 src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj create mode 100644 src/JiShe.CollectBus.KafkaProducer/ProducerService.cs diff --git a/JiShe.CollectBus.sln b/JiShe.CollectBus.sln index 674eca4..62bfe8b 100644 --- a/JiShe.CollectBus.sln +++ b/JiShe.CollectBus.sln @@ -31,6 +31,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JiShe.CollectBus.FreeSql", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.FreeRedisProvider", "src\JiShe.CollectBus.FreeRedisProvider\JiShe.CollectBus.FreeRedisProvider.csproj", "{C06C4082-638F-2996-5FED-7784475766C1}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JiShe.CollectBus.Kafka", "src\JiShe.CollectBus.KafkaProducer\JiShe.CollectBus.Kafka.csproj", "{919F4CDB-5C82-4371-B209-403B408DA248}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -89,6 +91,10 @@ Global {C06C4082-638F-2996-5FED-7784475766C1}.Debug|Any CPU.Build.0 = Debug|Any CPU {C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.ActiveCfg = Release|Any CPU {C06C4082-638F-2996-5FED-7784475766C1}.Release|Any CPU.Build.0 = Release|Any CPU + {919F4CDB-5C82-4371-B209-403B408DA248}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {919F4CDB-5C82-4371-B209-403B408DA248}.Debug|Any CPU.Build.0 = Debug|Any CPU + {919F4CDB-5C82-4371-B209-403B408DA248}.Release|Any CPU.ActiveCfg = Release|Any CPU + {919F4CDB-5C82-4371-B209-403B408DA248}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -107,6 +113,7 @@ Global {8BA01C3D-297D-42DF-BD63-EF07202A0A67} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {FE0457D9-4038-4A17-8808-DCAD06CFC0A0} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} {C06C4082-638F-2996-5FED-7784475766C1} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} + {919F4CDB-5C82-4371-B209-403B408DA248} = {649A3FFA-182F-4E56-9717-E6A9A2BEC545} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4324B3B4-B60B-4E3C-91D8-59576B4E26DD} diff --git a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs index c3b4886..b936bbb 100644 --- a/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/CreateToBeIssueTaskWorker.cs @@ -34,9 +34,7 @@ namespace JiShe.CollectBus.Workers public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) { - _logger.LogWarning($"构建待处理的下发指令任务处理开始"); //await _scheduledMeterReadingService.CreateToBeIssueTasks(); - _logger.LogWarning($"构建待处理的下发指令任务处理结束"); } } } diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs index 9be418a..f1bf5a1 100644 --- a/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/SubscriberFifteenMinuteWorker.cs @@ -33,11 +33,9 @@ namespace JiShe.CollectBus.Workers public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) { - _logger.LogWarning($"15分钟采集数据开始"); //await _scheduledMeterReadingService.AmmeterScheduledMeterFifteenMinuteReading(); //await _scheduledMeterReadingService.WatermeterScheduledMeterFifteenMinuteReading(); - _logger.LogWarning($"15分钟采集数据结束"); //using (var uow = LazyServiceProvider.LazyGetRequiredService().Begin()) //{ // Logger.LogInformation("Executed MyLogWorker..!"); diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs index 94e323d..2e491d6 100644 --- a/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/SubscriberFiveMinuteWorker.cs @@ -33,11 +33,8 @@ namespace JiShe.CollectBus.Workers public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) { - _logger.LogWarning($"5分钟采集数据开始"); //await _scheduledMeterReadingService.AmmeterScheduledMeterFiveMinuteReading(); //await _scheduledMeterReadingService.WatermeterScheduledMeterFiveMinuteReading(); - - _logger.LogWarning($"5分钟采集数据结束"); } } } diff --git a/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs b/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs index d7c8325..82b979b 100644 --- a/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs +++ b/src/JiShe.CollectBus.Application/Workers/SubscriberOneMinuteWorker.cs @@ -33,12 +33,10 @@ namespace JiShe.CollectBus.Workers public override async Task DoWorkAsync(CancellationToken cancellationToken = new CancellationToken()) { - _logger.LogWarning($"1分钟采集数据开始"); //await _scheduledMeterReadingService.AmmeterScheduledMeterOneMinuteReading(); //await _scheduledMeterReadingService.WatermeterScheduledMeterOneMinuteReading(); - _logger.LogWarning($"1分钟采集数据结束"); } } } diff --git a/src/JiShe.CollectBus.DbMigrator/JiShe.CollectBus.DbMigrator.csproj b/src/JiShe.CollectBus.DbMigrator/JiShe.CollectBus.DbMigrator.csproj index a333f5a..e163f7e 100644 --- a/src/JiShe.CollectBus.DbMigrator/JiShe.CollectBus.DbMigrator.csproj +++ b/src/JiShe.CollectBus.DbMigrator/JiShe.CollectBus.DbMigrator.csproj @@ -10,9 +10,9 @@ - - - + + + diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 26575c4..0936310 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -34,14 +34,14 @@ "CorsOrigins": "http://localhost:4200,http://localhost:3100" }, "ConnectionStrings": { - "Default": "mongodb://admin:lixiao1980@8.148.224.127:27017,8.148.224.21:27017,8.138.38.208:27017/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", - //"Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092", - "Kafka": "8.148.227.21:9092,8.148.224.127:9092,8.138.38.208:9092", + "Default": "mongodb://admin:admin02023@118.190.144.92:37117,118.190.144.92:37119,118.190.144.92:37120/JiSheCollectBus?authSource=admin&maxPoolSize=400&minPoolSize=10&waitQueueTimeoutMS=5000", + "Kafka": "121.42.242.91:29092,121.42.242.91:39092,121.42.242.91:49092", + //"Kafka": "8.148.227.21:9092,8.148.224.127:9092,8.138.38.208:9092", "PrepayDB": "server=118.190.144.92;database=jishe.sysdb;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False", "EnergyDB": "server=118.190.144.92;database=db_energy;uid=sa;pwd=admin@2023;Encrypt=False;Trust Server Certificate=False" }, "Redis": { - "Configuration": "8.138.38.208:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true,password=lixiao1980", + "Configuration": "118.190.144.92:6379,syncTimeout=30000,abortConnect=false,connectTimeout=30000,allowAdmin=true", "DefaultDB": "14", "HangfireDB": "15" @@ -84,7 +84,7 @@ } }, "Kafka": { - "EnableAuthorization": true, + "EnableAuthorization": false, "SecurityProtocol": "SASL_PLAINTEXT", "SaslMechanism": "PLAIN", "SaslUserName": "lixiao", diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs new file mode 100644 index 0000000..b307155 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -0,0 +1,11 @@ +using Volo.Abp.Modularity; + +namespace JiShe.CollectBus.KafkaProducer +{ + public class CollectBusKafkaModule : AbpModule + { + public override void ConfigureServices(ServiceConfigurationContext context) + { + } + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/ConsumerService.cs new file mode 100644 index 0000000..3d1d4e0 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/ConsumerService.cs @@ -0,0 +1,58 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JiShe.CollectBus.Kafka +{ + public abstract class ConsumerService : BackgroundService + { + private readonly IConsumer _consumer; + + private readonly ILogger> _logger; + + /// + /// Initializes a new instance of the class. + /// + /// The configuration. + /// The consumer. + /// The logger. + protected ConsumerService(IConfiguration configuration, IConsumer consumer, ILogger> logger) + { + _consumer = consumer; + _logger = logger; + var consumerConfig = new ConsumerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"], + GroupId = "InventoryConsumerGroup", + AutoOffsetReset = AutoOffsetReset.Earliest + }; + + _consumer = new ConsumerBuilder(consumerConfig).Build(); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _consumer.Subscribe("InventoryUpdates"); + + while (!stoppingToken.IsCancellationRequested) + { + + var consumeResult = _consumer.Consume(stoppingToken); + + var message = consumeResult.Message.Value; + + await ProcessMessageAsync(consumeResult); + } + + _consumer.Close(); + await Task.CompletedTask; + } + protected abstract Task ProcessMessageAsync(ConsumeResult consumer); + } +} diff --git a/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj new file mode 100644 index 0000000..72b2660 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/JiShe.CollectBus.Kafka.csproj @@ -0,0 +1,14 @@ + + + + net8.0 + enable + enable + + + + + + + + diff --git a/src/JiShe.CollectBus.KafkaProducer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/ProducerService.cs new file mode 100644 index 0000000..6651b59 --- /dev/null +++ b/src/JiShe.CollectBus.KafkaProducer/ProducerService.cs @@ -0,0 +1,28 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Configuration; +using Volo.Abp.DependencyInjection; + +namespace JiShe.CollectBus.Kafka +{ + public class ProducerService : ITransientDependency + { + private readonly IProducer _producer; + + public ProducerService(IConfiguration configuration) + { + var producerConfig = new ProducerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"] + }; + + _producer = new ProducerBuilder(producerConfig).Build(); + } + + public async Task ProduceAsync(string topic, T message) + { + var msg = new Message { Value = message }; + + await _producer.ProduceAsync(topic, msg); + } + } +}