diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index 2e5e1fb..248bb92 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -37,14 +37,14 @@ namespace JiShe.CollectBus.ScheduledMeterReading private readonly ICapPublisher _producerBus; private readonly IIoTDBProvider _dbProvider; private readonly IMeterReadingRecordRepository _meterReadingRecordRepository; - private readonly IProducerService _producerService; + private readonly IProducerService _producerService; public BasicScheduledMeterReadingService( ILogger logger, ICapPublisher producerBus, IMeterReadingRecordRepository meterReadingRecordRepository, - IProducerService producerService, + IProducerService producerService, IIoTDBProvider dbProvider) { _producerBus = producerBus; @@ -809,8 +809,8 @@ namespace JiShe.CollectBus.ScheduledMeterReading throw new Exception($"{nameof(KafkaProducerIssuedMessage)} 推送消息失败,参数异常,-101"); } int partition = DeviceGroupBalanceControl.GetDeviceGroupId(taskRecord.FocusAddress); - TopicPartition topicPartition = new TopicPartition(topicName, partition); - await _producerService.ProduceAsync(topicPartition, null, taskRecord); + + await _producerService.ProduceAsync(topicName, partition, taskRecord); } private async Task AmmerterCreatePublishTask(int timeDensity, MeterTypeEnum meterType) @@ -1158,7 +1158,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading FocusAddress = ammerterItem.Value.FocusAddress, TimeDensity = timeDensity.ToString(), }; - await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg); + //await _producerBus.PublishAsync(ProtocolConst.WatermeterSubscriberWorkerAutoReadingIssuedEventName, tempMsg); //_ = _producerBus.Publish(tempMsg); diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 32a3b9e..91f7d4a 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -36,7 +36,7 @@ namespace JiShe.CollectBus.ScheduledMeterReading { string serverTagName = string.Empty; public EnergySystemScheduledMeterReadingService(ILogger logger, - ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider) + ICapPublisher producerBus, IIoTDBProvider dbProvider, IMeterReadingRecordRepository meterReadingRecordRepository,IConfiguration configuration, IProducerService producerService) : base(logger, producerBus, meterReadingRecordRepository, producerService,dbProvider) { serverTagName = configuration.GetValue(CommonConst.ServerTagName)!; }