diff --git a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs index 7083387..8f37fb4 100644 --- a/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs +++ b/src/JiShe.CollectBus.Application/Plugins/TcpMonitor.cs @@ -24,7 +24,7 @@ namespace JiShe.CollectBus.Plugins { public partial class TcpMonitor : PluginBase, ITransientDependency, ITcpReceivedPlugin, ITcpConnectingPlugin, ITcpConnectedPlugin, ITcpClosedPlugin { - private readonly IPublishEndpoint _producerBus; + private readonly ICapPublisher _producerBus; private readonly ILogger _logger; private readonly IRepository _deviceRepository; private readonly IDistributedCache _ammeterInfoCache; @@ -36,7 +36,7 @@ namespace JiShe.CollectBus.Plugins /// /// /// - public TcpMonitor(IPublishEndpoint producerBus, + public TcpMonitor(ICapPublisher producerBus, ILogger logger, IRepository deviceRepository, IDistributedCache ammeterInfoCache) @@ -160,9 +160,9 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; - //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent); + await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedLoginEventName, messageReceivedLoginEvent); - await _producerBus.Publish( messageReceivedLoginEvent); + //await _producerBus.Publish( messageReceivedLoginEvent); } private async Task OnTcpHeartbeatReceived(ITcpSessionClient client, string messageHexString, string deviceNo) @@ -199,23 +199,13 @@ namespace JiShe.CollectBus.Plugins DeviceNo = deviceNo, MessageId = NewId.NextGuid().ToString() }; - //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent); - await _producerBus.Publish(messageReceivedHeartbeatEvent); + await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedHeartbeatEventName, messageReceivedHeartbeatEvent); + //await _producerBus.Publish(messageReceivedHeartbeatEvent); } private async Task OnTcpNormalReceived(ITcpSessionClient client, string messageHexString, string deviceNo) { - await _producerBus.Publish(new MessageReceived - { - ClientId = client.Id, - ClientIp = client.IP, - ClientPort = client.Port, - MessageHexString = messageHexString, - DeviceNo = deviceNo, - MessageId = NewId.NextGuid().ToString() - }); - - //await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + //await _producerBus.Publish(new MessageReceived //{ // ClientId = client.Id, // ClientIp = client.IP, @@ -224,6 +214,16 @@ namespace JiShe.CollectBus.Plugins // DeviceNo = deviceNo, // MessageId = NewId.NextGuid().ToString() //}); + + await _producerBus.PublishAsync(ProtocolConst.SubscriberReceivedEventName, new MessageReceived + { + ClientId = client.Id, + ClientIp = client.IP, + ClientPort = client.Port, + MessageHexString = messageHexString, + DeviceNo = deviceNo, + MessageId = NewId.NextGuid().ToString() + }); } } }