diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index eac2a92..9184a62 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -13,12 +13,14 @@ using JiShe.CollectBus.IoTDBProvider; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; +using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; using JiShe.CollectBus.RedisDataCache; using JiShe.CollectBus.Repository.MeterReadingRecord; using Mapster; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Diagnostics; @@ -38,22 +40,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; private readonly IProducerService _producerService; private readonly IRedisDataCacheService _redisDataCacheService; - private readonly ICapPublisher _producerBus; + private readonly KafkaOptionConfig _kafkaOptions; public BasicScheduledMeterReadingService( ILogger logger, - ICapPublisher producerBus, IMeterReadingRecordRepository meterReadingRecordRepository, IProducerService producerService, IRedisDataCacheService redisDataCacheService, - IIoTDBProvider dbProvider) + IIoTDBProvider dbProvider, + IOptions kafkaOptions) { - _producerBus = producerBus; _logger = logger; _dbProvider = dbProvider; _meterReadingRecordRepository = meterReadingRecordRepository; _producerService = producerService; _redisDataCacheService = redisDataCacheService; + _kafkaOptions = kafkaOptions.Value; } /// @@ -302,7 +304,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading timer1.Stop(); _logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒"); - //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista); + //DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions); //return; #else var meterInfos = await GetAmmeterInfoList(gatherCode); @@ -428,7 +430,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading } else { - DeviceGroupBalanceControl.InitializeCache(focusAddressDataList); + DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions); } timer.Stop(); diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 25c5476..cfb0193 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -15,6 +15,7 @@ using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.IotSystems.Watermeter; +using JiShe.CollectBus.Kafka; using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Repository; using JiShe.CollectBus.Repository.MeterReadingRecord; @@ -23,6 +24,7 @@ using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Volo.Abp.Domain.Repositories; using Volo.Abp.Uow; @@ -38,19 +40,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading string serverTagName = string.Empty; public EnergySystemScheduledMeterReadingService( ILogger logger, - ICapPublisher producerBus, IIoTDBProvider dbProvider, + IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository, - IConfiguration configuration, + IOptions kafkaOptions, IProducerService producerService, IRedisDataCacheService redisDataCacheService) : base(logger, - producerBus, meterReadingRecordRepository, producerService, redisDataCacheService, - dbProvider) + dbProvider, + kafkaOptions) { - serverTagName = configuration.GetValue(CommonConst.ServerTagName)!; + serverTagName = kafkaOptions.Value.ServerTagName; } public sealed override string SystemType => SystemTypeConst.Energy; diff --git a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs index 2502420..2d70f07 100644 --- a/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs +++ b/src/JiShe.CollectBus.Cassandra/CollectBusCassandraModule.cs @@ -17,13 +17,13 @@ namespace JiShe.CollectBus.Cassandra { Configure(context.Services.GetConfiguration().GetSection("Cassandra")); - context.AddCassandra(); + // context.AddCassandra(); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { - context.UseCassandra(); + // context.UseCassandra(); } } } diff --git a/src/JiShe.CollectBus.Common/Helpers/BusJsonSerializer.cs b/src/JiShe.CollectBus.Common/Helpers/BusJsonSerializer.cs index f938fd8..713501e 100644 --- a/src/JiShe.CollectBus.Common/Helpers/BusJsonSerializer.cs +++ b/src/JiShe.CollectBus.Common/Helpers/BusJsonSerializer.cs @@ -37,7 +37,8 @@ namespace JiShe.CollectBus.Common.Helpers ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释 PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感 PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则 - Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器, + Converters = { new DateTimeJsonConverter() }, // 注册你的自定义转换器, + DefaultBufferSize = 4096, }; } @@ -77,7 +78,8 @@ namespace JiShe.CollectBus.Common.Helpers ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释 PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感 PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则 - Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器, + Converters = { new DateTimeJsonConverter() }, // 注册你的自定义转换器, + DefaultBufferSize = 4096, }; } diff --git a/src/JiShe.CollectBus.Host/appsettings.json b/src/JiShe.CollectBus.Host/appsettings.json index 735ce0d..501a2a0 100644 --- a/src/JiShe.CollectBus.Host/appsettings.json +++ b/src/JiShe.CollectBus.Host/appsettings.json @@ -91,7 +91,8 @@ "SaslUserName": "lixiao", "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, - "NumPartitions": 30 + "NumPartitions": 30, + "ServerTagName": "JiSheCollectBus3" //"Topic": { // "ReplicationFactor": 3, // "NumPartitions": 1000 diff --git a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs index 7d0b73c..57e0a3e 100644 --- a/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs +++ b/src/JiShe.CollectBus.KafkaProducer/CollectBusKafkaModule.cs @@ -27,7 +27,7 @@ namespace JiShe.CollectBus.Kafka //} //context.Services.AddSingleton(kafkaOptionConfig); - context.Services.Configure(context.Services.GetConfiguration().GetSection(CommonConst.Kafka)); + //context.Services.Configure(context.Services.GetConfiguration().GetSection(CommonConst.Kafka)); Configure(options => { diff --git a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs index eeb5661..24f2029 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Consumer/ConsumerService.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Text; @@ -15,11 +16,11 @@ namespace JiShe.CollectBus.Kafka.Consumer private readonly KafkaOptionConfig _kafkaOptionConfig; private class KafkaConsumer where TKey : notnull where TValue : class { } - public ConsumerService(IConfiguration configuration, ILogger logger, KafkaOptionConfig kafkaOptionConfig) + public ConsumerService(IConfiguration configuration, ILogger logger, IOptions kafkaOptionConfig) { _configuration = configuration; _logger = logger; - _kafkaOptionConfig = kafkaOptionConfig; + _kafkaOptionConfig = kafkaOptionConfig.Value; } #region private 私有方法 diff --git a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs index 27702e0..42fc9cf 100644 --- a/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs +++ b/src/JiShe.CollectBus.KafkaProducer/Producer/ProducerService.cs @@ -8,6 +8,7 @@ using Confluent.Kafka; using JiShe.CollectBus.Kafka.Consumer; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; using YamlDotNet.Serialization; @@ -20,11 +21,11 @@ namespace JiShe.CollectBus.Kafka.Producer private readonly ConcurrentDictionary _producerCache = new(); private class KafkaProducer where TKey : notnull where TValue : class { } private readonly KafkaOptionConfig _kafkaOptionConfig; - public ProducerService(IConfiguration configuration,ILogger logger, KafkaOptionConfig kafkaOptionConfig) + public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig) { _configuration = configuration; _logger = logger; - _kafkaOptionConfig = kafkaOptionConfig; + _kafkaOptionConfig = kafkaOptionConfig.Value; } #region private 私有方法