优化服务应用配置,将其单独管理。

This commit is contained in:
ChenYi 2025-04-25 09:28:20 +08:00
parent f2be3a5516
commit e8a1a7d23e
8 changed files with 86 additions and 42 deletions

View File

@ -1,4 +1,5 @@
using Confluent.Kafka;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
@ -30,6 +31,8 @@ namespace JiShe.CollectBus.Kafka.Consumer
private readonly KafkaOptionConfig _kafkaOptionConfig;
private readonly ServerApplicationOptions _applicationOptions;
private readonly KafkaPollyPipeline _kafkaPollyPipeline;
/// <summary>
@ -37,10 +40,11 @@ namespace JiShe.CollectBus.Kafka.Consumer
/// </summary>
/// <param name="logger"></param>
/// <param name="kafkaOptionConfig"></param>
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline)
public ConsumerService(ILogger<ConsumerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions<ServerApplicationOptions> applicationOptions)
{
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
_applicationOptions = applicationOptions.Value;
_kafkaPollyPipeline = kafkaPollyPipeline;
}
@ -67,7 +71,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
var config = new ConsumerConfig
{
BootstrapServers = _kafkaOptionConfig.BootstrapServers,
GroupId = groupId ?? _kafkaOptionConfig.ServerTagName,
GroupId = groupId ?? _applicationOptions.ServerTagName,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false, // 禁止AutoCommit
EnablePartitionEof = true, // 启用分区末尾标记
@ -161,7 +165,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
@ -244,7 +248,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
}
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
@ -348,7 +352,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{
@ -485,7 +489,7 @@ namespace JiShe.CollectBus.Kafka.Consumer
{
if (_kafkaOptionConfig.EnableFilter)
{
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } };
var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } };
// 检查 Header 是否符合条件
if (!headersFilter.Match(result.Message.Headers))
{

View File

@ -9,11 +9,6 @@ public class KafkaOptionConfig
/// </summary>
public string BootstrapServers { get; set; } = null!;
/// <summary>
/// 服务器标识
/// </summary>
public string ServerTagName { get; set; } = "KafkaFilterKey";
/// <summary>
/// kafka主题副本数量
/// </summary>
@ -54,8 +49,4 @@ public class KafkaOptionConfig
/// </summary>
public string? SaslPassword { get; set; }
/// <summary>
/// 首次采集时间
/// </summary>
public DateTime? FirstCollectionTime { get; set; }
}

View File

@ -5,6 +5,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Kafka.Consumer;
using JiShe.CollectBus.Kafka.Internal;
using JiShe.CollectBus.Kafka.Serialization;
@ -23,18 +24,19 @@ namespace JiShe.CollectBus.Kafka.Producer
private readonly ConcurrentDictionary<Type, object> _producerCache = new();
private class KafkaProducer<TKey, TValue> where TKey : notnull where TValue : class { }
private readonly KafkaOptionConfig _kafkaOptionConfig;
private readonly ServerApplicationOptions _applicationOptions;
/// <summary>
/// ProducerService
/// </summary>
/// <param name="configuration"></param>
/// <param name="logger"></param>
/// <param name="kafkaOptionConfig"></param>
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig)
public ProducerService(IConfiguration configuration,ILogger<ProducerService> logger, IOptions<KafkaOptionConfig> kafkaOptionConfig, IOptions<ServerApplicationOptions> applicationOptions)
{
_configuration = configuration;
_logger = logger;
_kafkaOptionConfig = kafkaOptionConfig.Value;
_applicationOptions = applicationOptions.Value;
}
#region private
@ -119,7 +121,7 @@ namespace JiShe.CollectBus.Kafka.Producer
Key = key,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
};
await producer.ProduceAsync(topic, message);
@ -141,7 +143,7 @@ namespace JiShe.CollectBus.Kafka.Producer
//Key= _kafkaOptionConfig.ServerTagName,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
};
await producer.ProduceAsync(topic, message);
@ -165,7 +167,7 @@ namespace JiShe.CollectBus.Kafka.Producer
Key = key,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
};
var typeKey = typeof(KafkaProducer<TKey, TValue>);
@ -200,7 +202,7 @@ namespace JiShe.CollectBus.Kafka.Producer
//Key = _kafkaOptionConfig.ServerTagName,
Value = value,
Headers = new Headers{
{ "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) }
{ "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) }
}
};
var typeKey = typeof(KafkaProducer<Null, TValue>);

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
@ -25,6 +26,7 @@ using JiShe.CollectBus.Protocol.Contracts.SendData;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Identity.Client;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@ -44,6 +46,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
private readonly IProducerService _producerService;
private readonly IRedisDataCacheService _redisDataCacheService;
private readonly KafkaOptionConfig _kafkaOptions;
private readonly ServerApplicationOptions _applicationOptions;
private readonly IoTDBRuntimeContext _runtimeContext;
private readonly IProtocolService _protocolService;
@ -56,7 +59,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
IIoTDbProvider dbProvider,
IoTDBRuntimeContext runtimeContext,
IProtocolService protocolService,
IOptions<KafkaOptionConfig> kafkaOptions)
IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions)
{
_logger = logger;
_dbProvider = dbProvider;
@ -64,6 +68,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
_producerService = producerService;
_redisDataCacheService = redisDataCacheService;
_kafkaOptions = kafkaOptions.Value;
_applicationOptions = applicationOptions.Value;
_protocolService = protocolService;
_runtimeContext.UseTableSessionPool = true;
@ -113,6 +118,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading
}
var currentTime = DateTime.Now;
//定时抄读
foreach (var item in taskInfos)
{
var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync<TasksToBeIssueModel>(item);
@ -193,6 +200,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity);
await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel);
}
//电表定时阀控任务处理。
//电表定时广播校时,一天一次。
}
#region
@ -279,9 +289,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
if (_kafkaOptions.FirstCollectionTime.HasValue == false)
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_kafkaOptions.FirstCollectionTime = DateTime.Now;
_applicationOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存
foreach (var item in meterInfoGroupByTimeDensity)
@ -290,7 +300,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
LastTaskTime = null,
TimeDensity = item.Key,
NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。
@ -724,9 +734,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading
//根据采集频率分组,获得采集频率分组
var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity);
if (_kafkaOptions.FirstCollectionTime.HasValue == false)
if (_applicationOptions.FirstCollectionTime.HasValue == false)
{
_kafkaOptions.FirstCollectionTime = DateTime.Now;
_applicationOptions.FirstCollectionTime = DateTime.Now;
}
//先处理采集频率任务缓存
@ -736,7 +746,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
{
LastTaskTime = null,
TimeDensity = item.Key,
NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间
};
//todo 首次采集时间节点到目前运行时间中漏采的时间点可以考虑使用IoTDB的存储利用时间序列处理。

View File

@ -1,4 +1,5 @@
using JiShe.CollectBus.Application.Contracts;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Common.BuildSendDatas;
using JiShe.CollectBus.Common.Consts;
using JiShe.CollectBus.Common.DeviceBalanceControl;
@ -46,6 +47,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading
ILogger<EnergySystemScheduledMeterReadingService> logger,
IIoTDbProvider dbProvider,
IOptions<KafkaOptionConfig> kafkaOptions,
IOptions<ServerApplicationOptions> applicationOptions,
IoTDBRuntimeContext runtimeContext,
IProducerService producerService,
IProtocolService protocolService,
@ -56,9 +58,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading
dbProvider,
runtimeContext,
protocolService,
kafkaOptions)
kafkaOptions,
applicationOptions)
{
serverTagName = kafkaOptions.Value.ServerTagName;
serverTagName = applicationOptions.Value.ServerTagName;
_dbProvider = dbProvider;
_logger = logger;
_protocolService = protocolService;

View File

@ -0,0 +1,23 @@
namespace JiShe.CollectBus.Common
{
/// <summary>
/// 服务器应用配置
/// </summary>
public class ServerApplicationOptions
{
/// <summary>
/// 服务器标识
/// </summary>
public required string ServerTagName { get; set; }
/// <summary>
/// 首次采集时间
/// </summary>
public DateTime? FirstCollectionTime { get; set; }
/// <summary>
/// 自动验证时间
/// </summary>
public required string AutomaticVerificationTime { get; set;}
}
}

View File

@ -1,8 +1,10 @@
using Hangfire;
using HealthChecks.UI.Client;
using JiShe.CollectBus.Common;
using JiShe.CollectBus.Host.Extensions;
using JiShe.CollectBus.Host.HealthChecks;
using JiShe.CollectBus.Host.Swaggers;
using JiShe.CollectBus.IoTDB.Options;
using JiShe.CollectBus.MongoDB;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Swashbuckle.AspNetCore.SwaggerUI;
@ -50,6 +52,12 @@ namespace JiShe.CollectBus.Host
ConfigureCustom(context, configuration);
ConfigureHealthChecks(context, configuration);
Configure<AbpClockOptions>(options => { options.Kind = DateTimeKind.Local; });
Configure<ServerApplicationOptions>(options =>
{
configuration.GetSection(nameof(ServerApplicationOptions)).Bind(options);
});
}

View File

@ -79,9 +79,7 @@
"SaslUserName": "lixiao",
"SaslPassword": "lixiao1980",
"KafkaReplicationFactor": 3,
"NumPartitions": 30,
"ServerTagName": "JiSheCollectBus100",
"FirstCollectionTime": "2025-04-22 16:07:00"
"NumPartitions": 30
},
"IoTDBOptions": {
"UserName": "root",
@ -141,5 +139,10 @@
"DefaultIdempotence": true
}
},
"PlugInFolder": ""
"PlugInFolder": "",
"ServerApplicationOptions": {
"ServerTagName": "JiSheCollectBus100",
"FirstCollectionTime": "2025-04-22 16:07:00",
"AutomaticVerificationTime": "16:07:00"
}
}