From 94ff58dd12c2b98c8210d7280a4e3f096340a73f Mon Sep 17 00:00:00 2001 From: ChenYi <296215406@outlook.com> Date: Wed, 14 May 2025 14:40:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84Kafka=E4=B8=BB=E9=A2=98?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=B6=88=E8=B4=B9=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IWorkerSubscriberAppService.cs | 66 ++++++++-- .../BasicScheduledMeterReadingService.cs | 30 +++-- ...nergySystemScheduledMeterReadingService.cs | 76 +++++++++-- .../Subscribers/WorkerSubscriberAppService.cs | 124 ++++++++++++++++-- 4 files changed, 252 insertions(+), 44 deletions(-) diff --git a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs index 464537c..e899ce4 100644 --- a/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application.Contracts/Subscribers/IWorkerSubscriberAppService.cs @@ -1,6 +1,8 @@ -using JiShe.CollectBus.IotSystems.MessageIssueds; +using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.IotSystems.MessageIssueds; using JiShe.CollectBus.IotSystems.MessageReceiveds; using JiShe.CollectBus.IotSystems.MeterReadingRecords; +using JiShe.CollectBus.Kafka.Attributes; using JiShe.CollectBus.Kafka.Internal; using System.Collections.Generic; using System.Threading.Tasks; @@ -17,37 +19,83 @@ namespace JiShe.CollectBus.Subscribers #region 电表消息采集 /// - /// 1分钟采集电表数据下行消息消费订阅 + /// 一分钟定时抄读任务消息消费订阅 /// + /// /// - Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); + Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage); /// /// 5分钟采集电表数据下行消息消费订阅 /// + /// /// - Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); + Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage); /// /// 15分钟采集电表数据下行消息消费订阅 /// + /// /// - Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); + Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(List receivedMessage); /// /// 电表自动阀控下行消息消费订阅 /// + /// /// - Task AmmeterScheduledAutoValveControl(List issuedEventMessage); + Task AmmeterScheduledAutoValveControl(List receivedMessage); + + /// + /// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号、SIM卡号、定时校时等下行消息消费订阅 + /// + /// + /// + Task AmmeterScheduledOther(List receivedMessage); + + /// + /// 电表手动阀控下行消息消费订阅 + /// + /// + /// + Task AmmeterScheduledManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage); + + /// + /// 电表手动抄读下行消息消费订阅 + /// + /// + /// + Task AmmeterScheduledManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage); #endregion #region 水表消息采集 /// - /// 1分钟采集水表数据下行消息消费订阅 + /// 水表数据下行消息消费订阅 /// + /// /// - Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo issuedEventMessage); - + Task WatermeterScheduledAutoReading(MeterReadingTelemetryPacketInfo receivedMessage); + + /// + /// 水表自动阀控下行消息消费订阅 + /// + /// + /// + Task WatermeterScheduleAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage); + + /// + /// 水表手动阀控下行消息消费订阅 + /// + /// + /// + Task WatermeterScheduleManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage); + /// + /// 水表手动抄读下行消息消费订阅 + /// + /// + /// + Task WatermeterScheduleManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage); + #endregion } } diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index b7da236..552f04f 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -28,6 +28,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using Volo.Abp.Guids; namespace JiShe.CollectBus.ScheduledMeterReading { @@ -44,6 +45,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly DataMigrationOptions _dataMigrationOptions; private readonly KafkaOptionConfig _kafkaOptions; private readonly ServerApplicationOptions _applicationOptions; + private readonly IGuidGenerator _guidGenerator; int pageSize = 10000; @@ -53,6 +55,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IRedisDataCacheService redisDataCacheService, IIoTDbProvider dbProvider, IProtocolService protocolService, + IGuidGenerator guidGenerator, IOptions dataMigrationOptions, IOptions kafkaOptions, IOptions applicationOptions) @@ -67,6 +70,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading _kafkaOptions = kafkaOptions.Value; _applicationOptions = applicationOptions.Value; + _guidGenerator = guidGenerator; + } /// @@ -323,7 +328,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// public virtual async Task InitAmmeterCacheData(string gatherCode = "") { - return; + //return; try { @@ -750,7 +755,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading subItemCode: null, pendingCopyReadTime: timestamps, creationTime: currentTime, - packetType: (TelemetryPacketTypeEnum)timeDensity); + packetType: (TelemetryPacketTypeEnum)timeDensity, + _guidGenerator); taskList.Add(meterReadingRecords); } @@ -832,7 +838,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading subItemCode: subItemCode, pendingCopyReadTime: currentTime, creationTime: currentTime, - packetType: TelemetryPacketTypeEnum.AmmeterAutomaticVerificationTime); + packetType: TelemetryPacketTypeEnum.AmmeterAutomaticVerificationTime, + _guidGenerator); taskList.Add(meterReadingRecords); if (taskList == null || taskList.Count <= 0) @@ -901,7 +908,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading subItemCode: null, pendingCopyReadTime: currentTime, creationTime: currentTime, - packetType: TelemetryPacketTypeEnum.AmmeterDayFreeze); + packetType: TelemetryPacketTypeEnum.AmmeterDayFreeze, + _guidGenerator); taskList.Add(meterReadingRecords); } @@ -971,7 +979,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading subItemCode: null, pendingCopyReadTime: currentTime, creationTime: currentTime, - packetType: TelemetryPacketTypeEnum.AmmeterMonthFreeze); + packetType: TelemetryPacketTypeEnum.AmmeterMonthFreeze, + _guidGenerator); taskList.Add(meterReadingRecords); } @@ -1370,7 +1379,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading subItemCode: null, pendingCopyReadTime: currentTime, creationTime: currentTime, - packetType: TelemetryPacketTypeEnum.TerminalVersion); + packetType: TelemetryPacketTypeEnum.TerminalVersion, + _guidGenerator); taskList.Add(meterReadingRecords); @@ -1438,7 +1448,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading subItemCode: null, pendingCopyReadTime: currentTime, creationTime: currentTime, - packetType: TelemetryPacketTypeEnum.TelematicsModule); + packetType: TelemetryPacketTypeEnum.TelematicsModule, + _guidGenerator); taskList.Add(meterReadingRecords); @@ -1642,8 +1653,9 @@ namespace JiShe.CollectBus.ScheduledMeterReading /// 待采集时间,定时采集频率才是特殊情况,其他默认当前时间戳 /// 数据创建时间戳 /// 数据包类型 + /// Guid生成器 /// - protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(DeviceInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType) + protected MeterReadingTelemetryPacketInfo CreateAmmeterPacketInfo(DeviceInfo ammeterInfo, long timestamps, ProtocolBuildResponse builderResponse, string itemCode, string subItemCode, DateTime pendingCopyReadTime, DateTime creationTime, TelemetryPacketTypeEnum packetType, IGuidGenerator guidGenerator) { try { @@ -1672,7 +1684,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IsSend = false, ManualOrNot = false, Pn = ammeterInfo.MeteringCode, - IssuedMessageId = Guid.NewGuid().ToString(), + IssuedMessageId = guidGenerator.Create().ToString(), IssuedMessageHexString = Convert.ToHexString(builderResponse.Data), IsReceived = false, ScoreValue = $"{ammeterInfo.FocusAddress}.{taskMark}".Md5Fun(), diff --git a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 562d2b1..0521dca 100644 --- a/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/services/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -22,10 +22,12 @@ using JiShe.CollectBus.Protocol.Models; using Microsoft.AspNetCore.Authorization; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using MongoDB.Bson.Serialization.IdGenerators; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Volo.Abp.Guids; namespace JiShe.CollectBus.ScheduledMeterReading @@ -41,6 +43,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading string serverTagName = string.Empty; private readonly ILogger _logger; private readonly IProtocolService _protocolService; + private readonly IGuidGenerator _guidGenerator; public EnergySystemScheduledMeterReadingService( ILogger logger, @@ -48,6 +51,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading IRedisDataCacheService redisDataCacheService, IIoTDbProvider dbProvider, IProtocolService protocolService, + IGuidGenerator guidGenerator, IOptions dataMigrationOptions, IOptions kafkaOptions, IOptions applicationOptions) @@ -56,6 +60,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading redisDataCacheService, dbProvider, protocolService, + guidGenerator, dataMigrationOptions, kafkaOptions, applicationOptions) @@ -64,6 +69,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading systemType = applicationOptions.Value.SystemType; _logger = logger; _protocolService = protocolService; + _guidGenerator = guidGenerator; } public sealed override string SystemType => systemType; @@ -108,15 +114,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading Baudrate = 2400, FocusAddress = "442400040", Name = "保利单箱电表1", - FocusId = 95778, + FocusId = 95780, DatabaseBusiID = 1, MeteringCode = 1, MeterAddress = "442405000040", - MeterId = 127136, - TypeName = 3, + MeterId = 127035, + TypeName = 1, DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", TimeDensity = 15, BrandType = "DDS1980", + MeterType = MeterTypeEnum.Ammeter, + ProjectID = 1, }); ammeterInfos.Add(new DeviceInfo() @@ -124,15 +132,17 @@ namespace JiShe.CollectBus.ScheduledMeterReading Baudrate = 2400, FocusAddress = "442400039", Name = "保利单箱电表2", - FocusId = 95778, + FocusId = 69280, DatabaseBusiID = 1, MeteringCode = 1, MeterAddress = "442405000039", - MeterId = 127236, - TypeName = 3, + MeterId = 95594, + TypeName = 1, DataTypes = "449,503,581,582,583,584,585,586,587,588,589,590,591,592,593,594,597,598,599,600,601,602,603,604,605,606,607,608,661,663,677,679", TimeDensity = 15, BrandType = "DDS1980", + MeterType = MeterTypeEnum.Ammeter, + ProjectID = 1, }); //ammeterInfos.Add(new DeviceInfo() @@ -215,7 +225,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading .QueryAsync(sql); } catch (Exception ex) - { + { throw ex; } } @@ -265,18 +275,51 @@ namespace JiShe.CollectBus.ScheduledMeterReading string currentTimeStr = $"{currentTime:HH:mm}"; try - { + { +#if DEBUG + var settingInfos = new List(); + settingInfos.Add(new AmmeterAutoValveControlSetting() + { + MeterType = MeterTypeEnum.Ammeter, + AmmerterAddress = "442405000040", + FocusAddress = "442400040", + FocusId = 95780, + ProjectID = 1, + TripType = "off", + TripTime = "14:02", + MeterId = 127035, + LoopType = "EachDay", + EachDayWithout = "周六,周日", + TimeDensity = 15, + }); +#else //获取电表阀控配置 var settingInfos = await GetAmmeterAutoValveControlSetting(currentTimeStr); - if (settingInfos == null || settingInfos.Count <= 0) { _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,阀控数据为空, -101"); return null; } +#endif - //批量获取对应的缓存电表信息 - var ammeterInfos = new List(); + + //设备hash缓存key + string redisCacheDeviceInfoHashKey = $"{string.Format(RedisConst.CacheDeviceInfoHashKey, SystemType, ServerTagName)}"; + + Dictionary> keyValuePairsTemps = FreeRedisProvider.Instance.HGetAll>(redisCacheDeviceInfoHashKey); + List meterInfos = new List(); + List focusAddressDataLista = new List(); + foreach (var item in keyValuePairsTemps) + { + foreach (var subItem in item.Value) + { + if (subItem.MeterType == MeterTypeEnum.Ammeter && subItem.TimeDensity == 15) + { + meterInfos.Add(subItem); + focusAddressDataLista.Add(subItem.MeterId.ToString()); + } + } + } List taskList = new List(); foreach (var settingInfo in settingInfos) @@ -304,7 +347,13 @@ namespace JiShe.CollectBus.ScheduledMeterReading } //获取对应的缓存电表信息 - var ammeterInfo = ammeterInfos.First(); + var ammeterInfo = meterInfos.Where(d=>d.MeterId == settingInfo.MeterId).FirstOrDefault(); + if (ammeterInfo == null) + { + _logger.LogError($"{nameof(AmmeterScheduledAutoValveControl)} 电表自动阀控时,未找到对应电表信息,电表Id={settingInfo.MeterId}, -102"); + continue; + } + bool tripStateResult = false; string itemCode = T37612012PacketItemCodeConst.AFN10HFN01H; string subItemCode = string.Empty; @@ -361,7 +410,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading subItemCode: subItemCode, pendingCopyReadTime: currentTime, creationTime: currentTime, - packetType: TelemetryPacketTypeEnum.AmmeterAutoValveControl); + packetType: TelemetryPacketTypeEnum.AmmeterAutoValveControl, + _guidGenerator); taskList.Add(meterReadingRecords); } if (taskList == null || taskList.Count <= 0) diff --git a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index f661842..214caf7 100644 --- a/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/services/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -1,4 +1,5 @@ using JiShe.CollectBus.Common.Consts; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.IoTDB.Interface; using JiShe.CollectBus.IotSystems.MeterReadingRecords; using JiShe.CollectBus.Kafka.Attributes; @@ -8,6 +9,7 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; using TouchSocket.Sockets; +using static FreeSql.Internal.GlobalFilter; namespace JiShe.CollectBus.Subscribers { @@ -46,7 +48,7 @@ namespace JiShe.CollectBus.Subscribers [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOneMinuteIssuedEventName)] public async Task AmmeterScheduledMeterOneMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { - _logger.LogInformation("1分钟采集电表数据下行消息消费队列开始处理"); + _logger.LogError($"1分钟采集电表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}"); return await SendMessagesAsync(receivedMessage); } @@ -56,10 +58,10 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName)] + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFiveMinuteIssuedEventName, EnableBatch = true, BatchSize = 500)] public async Task AmmeterScheduledMeterFiveMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) { - _logger.LogInformation("5分钟采集电表数据下行消息消费队列开始处理"); + _logger.LogError($"5分钟采集电表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}"); return await SendMessagesAsync(receivedMessage); } @@ -68,11 +70,16 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - //[KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName)] - public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerFifteenMinuteIssuedEventName, EnableBatch = true, TaskCount = 30, BatchSize = 500)] + public async Task AmmeterScheduledMeterFifteenMinuteReadingIssuedEvent(List receivedMessage) { - _logger.LogInformation("15分钟采集电表数据下行消息消费队列开始处理"); - return await SendMessagesAsync(receivedMessage); + + foreach (var item in receivedMessage) + { + _logger.LogError($"15分钟采集电表数据下行消息消费队列开始处理:{item.Serialize()}"); + await SendMessagesAsync(item); + } + return SubscribeAck.Success(); } /// @@ -80,14 +87,60 @@ namespace JiShe.CollectBus.Subscribers /// /// /// - [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName,EnableBatch =true,TaskCount=30,BatchSize =500)] + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerAutoValveControlIssuedEventName, EnableBatch = true, BatchSize = 500)] public async Task AmmeterScheduledAutoValveControl(List receivedMessage) { //todo 如果是时段自动阀控,需要检查当前的时间,如果时间在自动阀控时间段内,则发送自动阀控报文,否则不发送,尤其是消息队列阻塞或者延时过长的时候。以免造成生产事故。 - //_logger.LogInformation("电表自动阀控下行消息消费队列开始处理"); - _logger.LogWarning($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}"); + foreach (var item in receivedMessage) + { + _logger.LogError($"电表自动阀控下行消息消费队列开始处理:{item.Serialize()}"); + await SendMessagesAsync(item); + } + return SubscribeAck.Success(); + } + + /// + /// 其他采集数据下行消息主题,日冻结,月冻结、集中器版本号、SIM卡号、定时校时等下行消息消费订阅 + /// + /// + /// + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerOtherIssuedEventName, EnableBatch = true, BatchSize = 500)] + public async Task AmmeterScheduledOther(List receivedMessage) + { + foreach (var item in receivedMessage) + { + _logger.LogError($"其他采集数据下行消息消费队列开始处理:{item.Serialize()}"); + await SendMessagesAsync(item); + } + return SubscribeAck.Success(); + } + + /// + /// 电表手动阀控下行消息消费订阅 + /// + /// + /// + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerManualValveControlIssuedEventName)] + public async Task AmmeterScheduledManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage) + { + _logger.LogError($"电表手动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}"); + + await SendMessagesAsync(receivedMessage); + return SubscribeAck.Success(); + } + + /// + /// 电表手动抄读下行消息消费订阅 + /// + /// + /// + [KafkaSubscribe(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName)] + public async Task AmmeterScheduledManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage) + { + _logger.LogError($"电表手动抄读下行消息消费队列开始处理:{receivedMessage.Serialize()}"); + + await SendMessagesAsync(receivedMessage); return SubscribeAck.Success(); - //return await SendMessagesAsync(receivedMessage); } /// @@ -132,9 +185,54 @@ namespace JiShe.CollectBus.Subscribers /// /// [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName)] - public async Task WatermeterSubscriberWorkerAutoReadingIssuedEvent(MeterReadingTelemetryPacketInfo receivedMessage) + public async Task WatermeterScheduledAutoReading(MeterReadingTelemetryPacketInfo receivedMessage) { - return await SendMessagesAsync(receivedMessage); + _logger.LogError($"水表数据下行消息消费队列开始处理:{receivedMessage.Serialize()}"); + + await SendMessagesAsync(receivedMessage); + return SubscribeAck.Success(); + } + + /// + /// 水表自动阀控下行消息消费订阅 + /// + /// + /// + [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerAutoValveControlIssuedEventName)] + public async Task WatermeterScheduleAutoValveControl(MeterReadingTelemetryPacketInfo receivedMessage) + { + _logger.LogError($"水表自动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}"); + + await SendMessagesAsync(receivedMessage); + return SubscribeAck.Success(); + } + + /// + /// 水表手动阀控下行消息消费订阅 + /// + /// + /// + [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerManualValveControlIssuedEventName)] + public async Task WatermeterScheduleManualValveControl(MeterReadingTelemetryPacketInfo receivedMessage) + { + _logger.LogError($"水表手动阀控下行消息消费队列开始处理:{receivedMessage.Serialize()}"); + + await SendMessagesAsync(receivedMessage); + return SubscribeAck.Success(); + } + + /// + /// 水表手动抄读下行消息消费订阅 + /// + /// + /// + [KafkaSubscribe(ProtocolConst.WatermeterSubscriberWorkerManualValveReadingIssuedEventName)] + public async Task WatermeterScheduleManualValveReading(MeterReadingTelemetryPacketInfo receivedMessage) + { + _logger.LogError($"水表手动抄读下行消息消费队列开始处理:{receivedMessage.Serialize()}"); + + await SendMessagesAsync(receivedMessage); + return SubscribeAck.Success(); } #endregion