This commit is contained in:
zenghongyao 2025-04-17 13:56:25 +08:00
commit 0584515df9
8 changed files with 30 additions and 21 deletions

View File

@ -13,12 +13,14 @@ using JiShe.CollectBus.IoTDBProvider;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Protocol.Contracts;
using JiShe.CollectBus.RedisDataCache;
using JiShe.CollectBus.Repository.MeterReadingRecord;
using Mapster;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@ -38,22 +40,22 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IMeterReadingRecordRepository _meterReadingRecordRepository;
private readonly IProducerService _producerService;
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly ICapPublisher _producerBus;
private readonly KafkaOptionConfig _kafkaOptions;
public BasicScheduledMeterReadingService(
ILogger<BasicScheduledMeterReadingService> logger,
ICapPublisher producerBus,
IMeterReadingRecordRepository meterReadingRecordRepository,
IProducerService producerService,
IRedisDataCacheService redisDataCacheService,
IIoTDBProvider dbProvider)
IIoTDBProvider dbProvider,
IOptions<KafkaOptionConfig> kafkaOptions)
{
_producerBus = producerBus;
_logger = logger;
_dbProvider = dbProvider;
_meterReadingRecordRepository = meterReadingRecordRepository;
_producerService = producerService;
_redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value;
}
/// <summary>
@ -302,7 +304,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
timer1.Stop();
_logger.LogError($"读取数据更花费时间{timer1.ElapsedMilliseconds}毫秒");
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista);
//DeviceGroupBalanceControl.InitializeCache(focusAddressDataLista, _kafkaOptions.NumPartitions);
//return;
#else
var meterInfos = await GetAmmeterInfoList(gatherCode);
@ -428,7 +430,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
else
{
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList);
DeviceGroupBalanceControl.InitializeCache(focusAddressDataList, _kafkaOptions.NumPartitions);
}
timer.Stop();

View File

@ -15,6 +15,7 @@ using JiShe.CollectBus.IotSystems.Devices;
using JiShe.CollectBus.IotSystems.MessageIssueds;
using JiShe.CollectBus.IotSystems.MeterReadingRecords;
using JiShe.CollectBus.IotSystems.Watermeter;
using JiShe.CollectBus.Kafka;
using JiShe.CollectBus.Kafka.Producer;
using JiShe.CollectBus.Repository;
using JiShe.CollectBus.Repository.MeterReadingRecord;
@ -23,6 +24,7 @@ using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.Uow;
@ -38,19 +40,19 @@ namespace JiShe.CollectBus.ScheduledMeterReading
string serverTagName = string.Empty;
public EnergySystemScheduledMeterReadingService(
ILogger<EnergySystemScheduledMeterReadingService> logger,
ICapPublisher producerBus, IIoTDBProvider dbProvider,
IIoTDBProvider dbProvider,
IMeterReadingRecordRepository meterReadingRecordRepository,
IConfiguration configuration,
IOptions<KafkaOptionConfig> kafkaOptions,
IProducerService producerService,
IRedisDataCacheService redisDataCacheService)
: base(logger,
producerBus,
meterReadingRecordRepository,
producerService,
redisDataCacheService,
dbProvider)
dbProvider,
kafkaOptions)
{
serverTagName = configuration.GetValue<string>(CommonConst.ServerTagName)!;
serverTagName = kafkaOptions.Value.ServerTagName;
}
public sealed override string SystemType => SystemTypeConst.Energy;

View File

@ -17,13 +17,13 @@ namespace JiShe.CollectBus.Cassandra
{
Configure<CassandraConfig>(context.Services.GetConfiguration().GetSection("Cassandra"));
context.AddCassandra();
// context.AddCassandra();
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context.UseCassandra();
// context.UseCassandra();
}
}
}

View File

@ -37,7 +37,8 @@ namespace JiShe.CollectBus.Common.Helpers
ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
Converters = { new DateTimeJsonConverter() }, // 注册你的自定义转换器,
DefaultBufferSize = 4096,
};
}
@ -77,7 +78,8 @@ namespace JiShe.CollectBus.Common.Helpers
ReadCommentHandling = JsonCommentHandling.Skip, // 忽略注释
PropertyNameCaseInsensitive = true, // 属性名称大小写不敏感
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // 属性名称使用驼峰命名规则
Converters = { new DateTimeJsonConverter() } // 注册你的自定义转换器,
Converters = { new DateTimeJsonConverter() }, // 注册你的自定义转换器,
DefaultBufferSize = 4096,
};
}

View File

@ -91,7 +91,8 @@
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30
"NumPartitions": 30,
"ServerTagName": "JiSheCollectBus3"
//"Topic": {
// "ReplicationFactor": 3,
// "NumPartitions": 1000

View File

@ -27,7 +27,7 @@ namespace JiShe.CollectBus.Kafka
//}
//context.Services.AddSingleton(kafkaOptionConfig);
context.Services.Configure<KafkaOptionConfig>(context.Services.GetConfiguration().GetSection(CommonConst.Kafka));
//context.Services.Configure<KafkaOptionConfig>(context.Services.GetConfiguration().GetSection(CommonConst.Kafka));
Configure<KafkaOptionConfig>(options =>
{

View File

@ -1,6 +1,7 @@
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Text;
@ -15,11 +16,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly KafkaOptionConfig _kafkaOptionConfig;
private class KafkaConsumer<TKey, TValue> where TKey : notnull where TValue : class { }
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger, KafkaOptionConfig kafkaOptionConfig)
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
{
_configuration = configuration;
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig;
_kafkaOptionConfig = kafkaOptionConfig.Value;
}
#region private

View File

@ -8,6 +8,7 @@ using Confluent.Kafka;
using JiShe.CollectBus.Kafka.Consumer;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.DependencyInjection;
using YamlDotNet.Serialization;
@ -20,11 +21,11 @@ namespace JiShe.CollectBus.Kafka.Producer
private readonly ConcurrentDictionary<Type, object> _producerCache = new();
private class KafkaProducer<TKey, TValue> where TKey : notnull where TValue : class { }
private readonly KafkaOptionConfig _kafkaOptionConfig;
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger, KafkaOptionConfig kafkaOptionConfig)
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
{
_configuration = configuration;
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig;
_kafkaOptionConfig = kafkaOptionConfig.Value;
}
#region private