diff --git a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs index 66643a5..0287f40 100644 --- a/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs +++ b/modules/JiShe.CollectBus.Kafka/AdminClient/AdminClientService.cs @@ -1,7 +1,9 @@ using Confluent.Kafka; using Confluent.Kafka.Admin; +using JiShe.CollectBus.Kafka.Internal; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Volo.Abp.DependencyInjection; namespace JiShe.CollectBus.Kafka.AdminClient; @@ -9,16 +11,17 @@ namespace JiShe.CollectBus.Kafka.AdminClient; public class AdminClientService : IAdminClientService, IDisposable, ISingletonDependency { private readonly ILogger _logger; - + private readonly KafkaOptionConfig _kafkaOptionConfig; /// /// Initializes a new instance of the class. /// /// /// - public AdminClientService(IConfiguration configuration, ILogger logger) + public AdminClientService(IConfiguration configuration, ILogger logger, IOptions kafkaOptionConfig) { _logger = logger; - Instance = GetInstance(configuration); + _kafkaOptionConfig = kafkaOptionConfig.Value; + Instance = GetInstance(); } /// @@ -142,22 +145,19 @@ public class AdminClientService : IAdminClientService, IDisposable, ISingletonDe /// /// Gets the instance. /// - /// The configuration. /// - public IAdminClient GetInstance(IConfiguration configuration) + public IAdminClient GetInstance() { - ArgumentException.ThrowIfNullOrWhiteSpace(configuration["Kafka:EnableAuthorization"]); - var enableAuthorization = bool.Parse(configuration["Kafka:EnableAuthorization"]!); var adminClientConfig = new AdminClientConfig { - BootstrapServers = configuration["Kafka:BootstrapServers"] + BootstrapServers = _kafkaOptionConfig.BootstrapServers }; - if (enableAuthorization) + if (_kafkaOptionConfig.EnableAuthorization) { adminClientConfig.SecurityProtocol = SecurityProtocol.SaslPlaintext; adminClientConfig.SaslMechanism = SaslMechanism.Plain; - adminClientConfig.SaslUsername = configuration["Kafka:SaslUserName"]; - adminClientConfig.SaslPassword = configuration["Kafka:SaslPassword"]; + adminClientConfig.SaslUsername = _kafkaOptionConfig.SaslUserName; + adminClientConfig.SaslPassword = _kafkaOptionConfig.SaslUserName; } return new AdminClientBuilder(adminClientConfig).Build(); } diff --git a/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs index 6f1e4fd..fd4bb1b 100644 --- a/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs +++ b/modules/JiShe.CollectBus.Kafka/Internal/KafkaPollyPipeline.cs @@ -34,7 +34,8 @@ namespace JiShe.CollectBus.Kafka.Internal ErrorCode.NotCoordinatorForGroup, ErrorCode.NetworkException, ErrorCode.GroupCoordinatorNotAvailable, - ErrorCode.InvalidGroupId + ErrorCode.InvalidGroupId, + ErrorCode.IllegalGeneration }; return ex switch { diff --git a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs index 67e1794..8b13ac0 100644 --- a/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs +++ b/protocols/JiShe.CollectBus.Protocol.T37612012/AnalysisData/DataStorage.cs @@ -287,6 +287,20 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData }; _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(treeData); + // 数据帧 + var treeFrameData = new TreeModelSingleMeasuringEntity() + { + SystemName = _applicationOptions.SystemType, + DeviceId = $"{data.DeviceId}", + DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", + ProjectId = $"{data.ProjectId}", + Timestamps = timestamps, + SingleMeasuring = new Tuple(ConcentratorStatusFieldConst.FrameData, analysisBaseDto.HexMessage ?? string.Empty) + }; + + _runtimeContext.UseTableSessionPool = false; // 使树模型池 + await _dbProvider.InsertAsync(treeFrameData); + // 时间 var treeRecordingTimeData = new TreeModelSingleMeasuringEntity() { @@ -295,7 +309,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", ProjectId = $"{data.ProjectId}", Timestamps = timestamps, - SingleMeasuring = new Tuple(ConcentratorStatusFieldConst.RecordingTime, data.TimeSpan!.Value.GetDateTimeOffset().ToUnixTimeNanoseconds()) + SingleMeasuring = new Tuple(ConcentratorStatusFieldConst.RecordingTime, (data.TimeSpan.HasValue ? data.TimeSpan.Value : DateTime.Now).GetDateTimeOffset().ToUnixTimeNanoseconds()) }; _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(treeRecordingTimeData); @@ -311,19 +325,7 @@ namespace JiShe.CollectBus.Protocol.T37612012.AnalysisData }; _runtimeContext.UseTableSessionPool = false; // 使树模型池 await _dbProvider.InsertAsync(treeRemarkData); - // 数据帧 - var treeFrameData = new TreeModelSingleMeasuringEntity() - { - SystemName = _applicationOptions.SystemType, - DeviceId = $"{data.DeviceId}", - DeviceType = $"{data.DeviceType}.{IOTDBDataType.Status}", - ProjectId = $"{data.ProjectId}", - Timestamps = timestamps, - SingleMeasuring = new Tuple(ConcentratorStatusFieldConst.FrameData, analysisBaseDto.HexMessage ?? string.Empty ) - }; - - _runtimeContext.UseTableSessionPool = false; // 使树模型池 - await _dbProvider.InsertAsync(treeFrameData); + return await Task.FromResult(true); } diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index 950f4d9..464537c 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -38,7 +38,7 @@ namespace JiShe.CollectBus.Subscribers /// 电表自动阀控下行消息消费订阅 /// /// - Task AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo issuedEventMessage); + Task AmmeterScheduledAutoValveControl(List issuedEventMessage); #endregion #region 水表消息采集 diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index 74ed99c..f661842 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -5,6 +5,7 @@ using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; using Microsoft.Extensions.Logging; using System; +using System.Collections.Generic; using System.Threading.Tasks; using TouchSocket.Sockets; @@ -67,11 +68,10 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] + //[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); - return await SendMessagesAsync(receivedMessage); } @@ -80,12 +80,14 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName)] - public async Task AmmeterScheduledAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage) + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName,EnableBatch =true,TaskCount=30,BatchSize =500)] + public async Task AmmeterScheduledAutoValveControl(List receivedMessage) { //todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。 - _logger.LogInformation("电表自动阀控下行消息消费队列开始处理"); - return await SendMessagesAsync(receivedMessage); + //_logger.LogInformation("电表自动阀控下行消息消费队列开始处理"); + _logger.LogWarning($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}"); + return SubscribeAck.Success(); + //return await SendMessagesAsync(receivedMessage); } /// diff --git a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml index a58ded5..30e91e8 100644 --- a/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml +++ b/web/JiShe.CollectBus.Host/Pages/Monitor.cshtml @@ -16,6 +16,7 @@ 后端服务 +