diff --git a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs index d4a62db..7f8c31c 100644 --- a/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Consumer/ConsumerService.cs @@ -1,4 +1,5 @@ using Confluent.Kafka; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; @@ -30,6 +31,8 @@ namespace JiShe.CollectBus.Kafka.Consumer private readonly KafkaOptionConfig _kafkaOptionConfig; + private readonly ServerApplicationOptions _applicationOptions; + private readonly KafkaPollyPipeline _kafkaPollyPipeline; /// @@ -37,10 +40,11 @@ namespace JiShe.CollectBus.Kafka.Consumer /// /// /// - public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline) + public ConsumerService(ILogger logger, IOptions kafkaOptionConfig, KafkaPollyPipeline kafkaPollyPipeline, IOptions applicationOptions) { _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; + _applicationOptions = applicationOptions.Value; _kafkaPollyPipeline = kafkaPollyPipeline; } @@ -67,7 +71,7 @@ namespace JiShe.CollectBus.Kafka.Consumer var config = new ConsumerConfig { BootstrapServers = _kafkaOptionConfig.BootstrapServers, - GroupId = groupId ?? _kafkaOptionConfig.ServerTagName, + GroupId = groupId ?? _applicationOptions.ServerTagName, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, // 禁止AutoCommit EnablePartitionEof = true, // 启用分区末尾标记 @@ -161,7 +165,7 @@ namespace JiShe.CollectBus.Kafka.Consumer } if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { @@ -244,7 +248,7 @@ namespace JiShe.CollectBus.Kafka.Consumer } if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { @@ -348,7 +352,7 @@ namespace JiShe.CollectBus.Kafka.Consumer { if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { @@ -485,7 +489,7 @@ namespace JiShe.CollectBus.Kafka.Consumer { if (_kafkaOptionConfig.EnableFilter) { - var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } }; + var headersFilter = new HeadersFilter { { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } }; // 检查 Header 是否符合条件 if (!headersFilter.Match(result.Message.Headers)) { diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs index 3bea5f1..38c9482 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaOptionConfig.cs @@ -8,12 +8,7 @@ public class KafkaOptionConfig /// kafka地址 /// public string BootstrapServers { get; set; } = null!; - - /// - /// 服务器标识 - /// - public string ServerTagName { get; set; } = "KafkaFilterKey"; - + /// /// kafka主题副本数量 /// @@ -54,8 +49,4 @@ public class KafkaOptionConfig /// public string? SaslPassword { get; set; } - /// - /// 首次采集时间 - /// - public DateTime? FirstCollectionTime { get; set; } } \ No newline at end of file diff --git a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs index 16499b5..50df423 100644 --- a/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs +++ b/modules/JiShe.CollectBus.Kafka/Producer/ProducerService.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using Confluent.Kafka; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Kafka.Consumer; using JiShe.CollectBus.Kafka.Internal; using JiShe.CollectBus.Kafka.Serialization; @@ -23,18 +24,19 @@ namespace JiShe.CollectBus.Kafka.Producer private readonly ConcurrentDictionary _producerCache = new(); private class KafkaProducer where TKey : notnull where TValue : class { } private readonly KafkaOptionConfig _kafkaOptionConfig; - + private readonly ServerApplicationOptions _applicationOptions; /// /// ProducerService /// /// /// /// - public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig) + public ProducerService(IConfiguration configuration,ILogger logger, IOptions kafkaOptionConfig, IOptions applicationOptions) { _configuration = configuration; _logger = logger; _kafkaOptionConfig = kafkaOptionConfig.Value; + _applicationOptions = applicationOptions.Value; } #region private 私有方法 @@ -119,7 +121,7 @@ namespace JiShe.CollectBus.Kafka.Producer Key = key, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } + { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } }; await producer.ProduceAsync(topic, message); @@ -141,7 +143,7 @@ namespace JiShe.CollectBus.Kafka.Producer //Key= _kafkaOptionConfig.ServerTagName, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } + { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } }; await producer.ProduceAsync(topic, message); @@ -165,7 +167,7 @@ namespace JiShe.CollectBus.Kafka.Producer Key = key, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } + { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } }; var typeKey = typeof(KafkaProducer); @@ -200,7 +202,7 @@ namespace JiShe.CollectBus.Kafka.Producer //Key = _kafkaOptionConfig.ServerTagName, Value = value, Headers = new Headers{ - { "route-key", Encoding.UTF8.GetBytes(_kafkaOptionConfig.ServerTagName) } + { "route-key", Encoding.UTF8.GetBytes(_applicationOptions.ServerTagName) } } }; var typeKey = typeof(KafkaProducer); diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 4a7d74c..750b938 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Application.Contracts; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; @@ -25,6 +26,7 @@ using JiShe.CollectBus.Protocol.Contracts.SendData; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Microsoft.Identity.Client; using System; using System.Collections.Generic; using System.Diagnostics; @@ -44,6 +46,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly IProducerService _producerService; private readonly IRedisDataCacheService _redisDataCacheService; private readonly KafkaOptionConfig _kafkaOptions; + private readonly ServerApplicationOptions _applicationOptions; private readonly IoTDBRuntimeContext _runtimeContext; private readonly IProtocolService _protocolService; @@ -56,7 +59,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading IIoTDbProvider dbProvider, IoTDBRuntimeContext runtimeContext, IProtocolService protocolService, - IOptions kafkaOptions) + IOptions kafkaOptions, + IOptions applicationOptions) { _logger = logger; _dbProvider = dbProvider; @@ -64,6 +68,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading _producerService = producerService; _redisDataCacheService = redisDataCacheService; _kafkaOptions = kafkaOptions.Value; + _applicationOptions = applicationOptions.Value; _protocolService = protocolService; _runtimeContext.UseTableSessionPool = true; @@ -113,6 +118,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading } var currentTime = DateTime.Now; + + //定时抄读 foreach (var item in taskInfos) { var tasksToBeIssueModel = await FreeRedisProvider.Instance.GetAsync(item); @@ -193,6 +200,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading tasksToBeIssueModel.NextTaskTime = currentTaskTime.CalculateNextCollectionTime(timeDensity); await FreeRedisProvider.Instance.SetAsync(item, tasksToBeIssueModel); } + + //电表定时阀控任务处理。 + //电表定时广播校时,一天一次。 } #region 电表采集处理 @@ -279,9 +289,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); - if (_kafkaOptions.FirstCollectionTime.HasValue == false) + if (_applicationOptions.FirstCollectionTime.HasValue == false) { - _kafkaOptions.FirstCollectionTime = DateTime.Now; + _applicationOptions.FirstCollectionTime = DateTime.Now; } //先处理采集频率任务缓存 foreach (var item in meterInfoGroupByTimeDensity) @@ -290,7 +300,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { LastTaskTime = null, TimeDensity = item.Key, - NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 }; //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 @@ -724,9 +734,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading //根据采集频率分组,获得采集频率分组 var meterInfoGroupByTimeDensity = meterInfos.GroupBy(d => d.TimeDensity); - if (_kafkaOptions.FirstCollectionTime.HasValue == false) + if (_applicationOptions.FirstCollectionTime.HasValue == false) { - _kafkaOptions.FirstCollectionTime = DateTime.Now; + _applicationOptions.FirstCollectionTime = DateTime.Now; } //先处理采集频率任务缓存 @@ -736,7 +746,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { LastTaskTime = null, TimeDensity = item.Key, - NextTaskTime = _kafkaOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 + NextTaskTime = _applicationOptions.FirstCollectionTime.Value.CalculateNextCollectionTime(item.Key),//使用首次采集时间作为下一次采集时间 }; //todo 首次采集时间节点到目前运行时间中漏采的时间点,可以考虑使用IoTDB的存储,利用时间序列处理。 diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index e2bae0a..fbe667a 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Application.Contracts; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; @@ -46,6 +47,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading ILogger logger, IIoTDbProvider dbProvider, IOptions kafkaOptions, + IOptions applicationOptions, IoTDBRuntimeContext runtimeContext, IProducerService producerService, IProtocolService protocolService, @@ -56,9 +58,10 @@ namespace JiShe.CollectBus.ScheduledMeterReading dbProvider, runtimeContext, protocolService, - kafkaOptions) + kafkaOptions, + applicationOptions) { - serverTagName = kafkaOptions.Value.ServerTagName; + serverTagName = applicationOptions.Value.ServerTagName; _dbProvider = dbProvider; _logger = logger; _protocolService = protocolService; @@ -180,7 +183,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// /// public override async Task AmmeterScheduledAutoValveControl() - { + { var currentTime = DateTime.Now; string currentTimeStr = $"{currentTime:HH:mm}"; @@ -199,7 +202,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading var ammeterInfos = new List(); List taskList = new List(); var metadata = await _dbProvider.GetMetadata(); - + foreach (var settingInfo in settingInfos) { bool isGenerate = false; @@ -249,9 +252,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading _logger.LogError($"集中器[{settingInfo.FocusAddress}],[{settingInfo.MeterId}]阀控命令错误:{settingInfo.TripType},-102"); continue; } - + var temCode = "10_01_"; - + //根据电表型号获取协议插件 var protocolPlugin = await _protocolService.GetProtocolServiceAsync(ammeterInfo.BrandType); if (protocolPlugin == null) @@ -272,8 +275,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading ItemCode = subItemCode, } }); - - string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA,builderResponse.Seq); + + string taskMark = CommonHelper.GetTaskMark(builderResponse.AFn, builderResponse.Fn, ammeterInfo.MeteringCode, builderResponse.MSA, builderResponse.Seq); var meterReadingRecords = new MeterReadingTelemetryPacketInfo() { SystemName = SystemType, diff --git a/shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs b/shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs new file mode 100644 index 0000000..6ed9e4a --- /dev/null +++ b/shared/JiShe.CollectBus.Common/Models/ServerApplicationOptions.cs @@ -0,0 +1,23 @@ +namespace JiShe.CollectBus.Common +{ + /// + /// 服务器应用配置 + /// + public class ServerApplicationOptions + { + /// + /// 服务器标识 + /// + public required string ServerTagName { get; set; } + + /// + /// 首次采集时间 + /// + public DateTime? FirstCollectionTime { get; set; } + + /// + /// 自动验证时间 + /// + public required string AutomaticVerificationTime { get; set;} + } +} diff --git a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs index 5068663..e451a1d 100644 --- a/web/JiShe.CollectBus.Host/CollectBusHostModule.cs +++ b/web/JiShe.CollectBus.Host/CollectBusHostModule.cs @@ -1,8 +1,10 @@ using Hangfire; using HealthChecks.UI.Client; +using JiShe.CollectBus.Common; using JiShe.CollectBus.Host.Extensions; using JiShe.CollectBus.Host.HealthChecks; using JiShe.CollectBus.Host.Swaggers; +using JiShe.CollectBus.IoTDB.Options; using JiShe.CollectBus.MongoDB; using Microsoft.AspNetCore.Diagnostics.HealthChecks; using Swashbuckle.AspNetCore.SwaggerUI; @@ -28,7 +30,7 @@ namespace JiShe.CollectBus.Host typeof(AbpSwashbuckleModule), typeof(AbpTimingModule), typeof(CollectBusApplicationModule), - typeof(CollectBusMongoDbModule), + typeof(CollectBusMongoDbModule), typeof(AbpCachingStackExchangeRedisModule), typeof(AbpBackgroundWorkersHangfireModule) )] @@ -50,6 +52,12 @@ namespace JiShe.CollectBus.Host ConfigureCustom(context, configuration); ConfigureHealthChecks(context, configuration); Configure(options => { options.Kind = DateTimeKind.Local; }); + + Configure(options => + { + configuration.GetSection(nameof(ServerApplicationOptions)).Bind(options); + }); + } diff --git a/web/JiShe.CollectBus.Host/appsettings.json b/web/JiShe.CollectBus.Host/appsettings.json index e5fe35d..4bd24be 100644 --- a/web/JiShe.CollectBus.Host/appsettings.json +++ b/web/JiShe.CollectBus.Host/appsettings.json @@ -79,9 +79,7 @@ "SaslUserName": "lixiao", "SaslPassword": "lixiao1980", "KafkaReplicationFactor": 3, - "NumPartitions": 30, - "ServerTagName": "JiSheCollectBus100", - "FirstCollectionTime": "2025-04-22 16:07:00" + "NumPartitions": 30 }, "IoTDBOptions": { "UserName": "root", @@ -141,5 +139,10 @@ "DefaultIdempotence": true } }, - "PlugInFolder": "" + "PlugInFolder": "", + "ServerApplicationOptions": { + "ServerTagName": "JiSheCollectBus100", + "FirstCollectionTime": "2025-04-22 16:07:00", + "AutomaticVerificationTime": "16:07:00" + } } \ No newline at end of file