diff --git a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs index 0542ede..a210d28 100644 --- a/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs +++ b/src/JiShe.CollectBus.Application/EnergySystem/EnergySystemAppService.cs @@ -5,15 +5,16 @@ using System.Net; using System.Text; using System.Threading.Tasks; using DeviceDetectorNET.Class.Device; -using DotNetCore.CAP; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; +using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.EnergySystem.Dto; using JiShe.CollectBus.FreeSql; using JiShe.CollectBus.IotSystems.PrepayModel; using JiShe.CollectBus.IotSystems.Records; +using JiShe.CollectBus.Kafka.Producer; using JiShe.CollectBus.Protocol.Contracts; using MassTransit; using Microsoft.AspNetCore.Mvc; @@ -28,15 +29,15 @@ namespace JiShe.CollectBus.EnergySystem private readonly IRepository _focusRecordRepository; private readonly IRepository _csqRecordRepository; private readonly IRepository _conrOnlineRecordRepository; - private readonly ICapPublisher _capBus; + private readonly IProducerService _producerService; public EnergySystemAppService(IRepository focusRecordRepository, IRepository csqRecordRepository, - IRepository conrOnlineRecordRepository, ICapPublisher capBus) + IRepository conrOnlineRecordRepository, IProducerService producerService) { _focusRecordRepository = focusRecordRepository; _csqRecordRepository = csqRecordRepository; _conrOnlineRecordRepository = conrOnlineRecordRepository; - _capBus = capBus; + _producerService = producerService; } /// @@ -71,14 +72,14 @@ namespace JiShe.CollectBus.EnergySystem return result; - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); result.Status = true; result.Msg = "操作成功"; result.Data.ValidData = true; @@ -108,14 +109,14 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); } return result; @@ -149,14 +150,14 @@ namespace JiShe.CollectBus.EnergySystem }).ToList(); var bytes = Build3761SendData.BuildAmmeterParameterSetSendCmd(address, meterParameters); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); result.Status = true; result.Msg = "操作成功"; result.Data.ValidData = true; @@ -178,14 +179,14 @@ namespace JiShe.CollectBus.EnergySystem { var dataUnit = Build645SendData.BuildReadMeterAddressSendDataUnit(detail.MeterAddress); var bytes =Build3761SendData.BuildTransparentForwardingSendCmd(address, detail.Port, detail.BaudRate.ToString(), dataUnit, StopBit.Stop1, Parity.None); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); } result.Status = true; @@ -261,14 +262,14 @@ namespace JiShe.CollectBus.EnergySystem if (bytes != null) { - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); if (isManual) { @@ -320,14 +321,14 @@ namespace JiShe.CollectBus.EnergySystem var bytes = Build3761SendData.BuildCommunicationParametersSetSendCmd(address, masterIP, materPort, backupIP, backupPort, input.Data.APN); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); result.Status = true; result.Msg = "操作成功"; @@ -347,14 +348,14 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{input.AreaCode}{input.Address}"; var bytes = Build3761SendData.BuildTerminalCalendarClockSendCmd(address); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); result.Status = true; result.Msg = "操作成功"; @@ -375,14 +376,14 @@ namespace JiShe.CollectBus.EnergySystem bool isManual = !input.AreaCode.Equals("5110");//低功耗集中器不是长连接,在连接的那一刻再发送 var bytes = Build3761SendData.BuildConrCheckTimeSendCmd(address,DateTime.Now, isManual); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); result.Status = true; result.Msg = "操作成功"; @@ -402,14 +403,14 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{input.AreaCode}{input.Address}"; var bytes = Build3761SendData.BuildConrRebootSendCmd(address); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); result.Status = true; result.Msg = "操作成功"; @@ -430,14 +431,14 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{input.AreaCode}{input.Address}"; var pnList = input.Data.Split(',').Select(it => int.Parse(it)).ToList(); var bytes = Build3761SendData.BuildAmmeterParameterReadingSendCmd(address, pnList); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); result.Status = true; result.Msg = "操作成功"; return result; @@ -479,14 +480,14 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); } result.Status = true; @@ -548,14 +549,14 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); } result.Status = true; result.Msg = "操作成功"; @@ -577,14 +578,14 @@ namespace JiShe.CollectBus.EnergySystem var address = $"{code.AreaCode}{code.Address}"; var bytes = Build3761SendData.BuildAmmeterReportCollectionItemsSetSendCmd(address,input.Detail.Pn, input.Detail.Unit,input.Detail.Cycle,input.Detail.BaseTime, input.Detail.CurveRatio,input.Detail.Details.Select(it => new PnFn(it.Pn,it.Fn)).ToList()); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); } result.Status = true; result.Msg = "操作成功"; @@ -605,14 +606,14 @@ namespace JiShe.CollectBus.EnergySystem { var address = $"{code.AreaCode}{code.Address}"; var bytes = Build3761SendData.BuildAmmeterAutoUpSwitchSetSendCmd(address, input.Detail.Pn,input.Detail.IsOpen); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); } result.Status = true; result.Msg = "操作成功"; @@ -631,14 +632,14 @@ namespace JiShe.CollectBus.EnergySystem var result = new BaseResultDto(); var address = $"{input.AreaCode}{input.Address}"; var bytes = Build3761SendData.BuildAmmeterReadAutoUpSwitchSendCmd(address, input.Detail.Pn); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); result.Status = true; result.Msg = "操作成功"; return result; @@ -658,14 +659,14 @@ namespace JiShe.CollectBus.EnergySystem { var address = $"{data.AreaCode}{data.Address}"; var bytes = Build3761SendData.BuildTerminalVersionInfoReadingSendCmd(address); - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); } result.Status = true; result.Msg = "操作成功"; @@ -713,14 +714,14 @@ namespace JiShe.CollectBus.EnergySystem foreach (var bytes in bytesList) { - await _capBus.PublishAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage + await _producerService.ProduceAsync(ProtocolConst.AmmeterSubscriberWorkerManualValveReadingIssuedEventName, new IssuedEventMessage { //ClientId = messageReceived.ClientId, DeviceNo = address, Message = bytes, Type = IssuedEventType.Data, MessageId = NewId.NextGuid().ToString() - }); + }.Serialize()); } result.Status = true; diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 29427fc..7af99bc 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -3,7 +3,6 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; -using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs index f2a08b1..0f601d9 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/BasicScheduledMeterReadingService.cs @@ -1,5 +1,4 @@ -using DotNetCore.CAP; -using JiShe.CollectBus.Ammeters; +using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.BuildSendDatas; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; diff --git a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs index 8593dfd..c1e45bd 100644 --- a/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs +++ b/src/JiShe.CollectBus.Application/ScheduledMeterReading/EnergySystemScheduledMeterReadingService.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Threading.Tasks; using Confluent.Kafka; -using DotNetCore.CAP; using JiShe.CollectBus.Ammeters; using JiShe.CollectBus.Common.Consts; using JiShe.CollectBus.Common.DeviceBalanceControl; diff --git a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs index 9bf297f..fb469ac 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/SubscriberAppService.cs @@ -1,5 +1,4 @@ -using DotNetCore.CAP; -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Helpers; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.IoTDBProvider; diff --git a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs index f3c485c..95b563c 100644 --- a/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs +++ b/src/JiShe.CollectBus.Application/Subscribers/WorkerSubscriberAppService.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Threading.Tasks; using DeviceDetectorNET.Parser.Device; -using DotNetCore.CAP; using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.IotSystems.Devices; using JiShe.CollectBus.IotSystems.MessageIssueds; diff --git a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs index 23d0917..d066a28 100644 --- a/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs +++ b/src/JiShe.CollectBus.Protocol.Contracts/Abstracts/BaseProtocolPlugin.cs @@ -1,5 +1,4 @@ -using DotNetCore.CAP; -using JiShe.CollectBus.Common.Enums; +using JiShe.CollectBus.Common.Enums; using JiShe.CollectBus.Common.Extensions; using JiShe.CollectBus.Common.Models; using JiShe.CollectBus.Protocol.Contracts.Interfaces;