From 1199813abfc2ac71fbb216c82a77a92d0c363c83 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Wed, 2 Aug 2017 11:09:32 +0300 Subject: [PATCH] implement qos --- pkg/amqp-ext/AmqpContext.php | 8 ++++ pkg/amqp-lib/AmqpConnectionFactory.php | 8 +++- pkg/amqp-lib/AmqpContext.php | 32 ++++++++++--- .../Tests/AmqpConnectionFactoryConfigTest.php | 27 +++++++++++ pkg/amqp-lib/Tests/AmqpContextTest.php | 45 ++++++++++++++----- pkg/amqp-lib/Tests/Spec/AmqpContextTest.php | 2 +- 6 files changed, 103 insertions(+), 19 deletions(-) diff --git a/pkg/amqp-ext/AmqpContext.php b/pkg/amqp-ext/AmqpContext.php index 6d1f40f06..32a135b18 100644 --- a/pkg/amqp-ext/AmqpContext.php +++ b/pkg/amqp-ext/AmqpContext.php @@ -255,6 +255,14 @@ public function close() } } + /** + * {@inheritdoc} + */ + public function setQos($prefetchSize, $prefetchCount, $global) + { + $this->getExtChannel()->qos($prefetchSize, $prefetchCount); + } + /** * @return \AMQPChannel */ diff --git a/pkg/amqp-lib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php index 9c1f02186..cbee87991 100644 --- a/pkg/amqp-lib/AmqpConnectionFactory.php +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -33,6 +33,9 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory * 'lazy' => 'the connection will be performed as later as possible, if the option set to true', * 'stream' => 'stream or socket connection', * 'receive_method' => 'Could be either basic_get or basic_consume', + * 'qos_prefetch_size' => 'The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit"', + * 'qos_prefetch_count' => 'Specifies a prefetch window in terms of whole messages.', + * 'qos_global' => 'If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection.', * ] * * or @@ -69,7 +72,7 @@ public function __construct($config = 'amqp://') */ public function createContext() { - return new AmqpContext($this->establishConnection(), $this->config['receive_method']); + return new AmqpContext($this->establishConnection(), $this->config); } /** @@ -224,6 +227,9 @@ private function defaultConfig() 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, 'receive_method' => 'basic_get', + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ]; } } diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index 2f74279db..e3f64776e 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -33,7 +33,7 @@ class AmqpContext implements InteropAmqpContext /** * @var string */ - private $receiveMethod; + private $config; /** * @var Buffer @@ -42,12 +42,18 @@ class AmqpContext implements InteropAmqpContext /** * @param AbstractConnection $connection - * @param string $receiveMethod + * @param array $config */ - public function __construct(AbstractConnection $connection, $receiveMethod) + public function __construct(AbstractConnection $connection, $config = []) { + $this->config = array_replace([ + 'receive_method' => 'basic_get', + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, + ], $config); + $this->connection = $connection; - $this->receiveMethod = $receiveMethod; $this->buffer = new Buffer(); } @@ -99,10 +105,10 @@ public function createConsumer(PsrDestination $destination) $queue = $this->createTemporaryQueue(); $this->bind(new AmqpBind($destination, $queue, $queue->getQueueName())); - return new AmqpConsumer($this->getChannel(), $queue, $this->buffer, $this->receiveMethod); + return new AmqpConsumer($this->getChannel(), $queue, $this->buffer, $this->config['receive_method']); } - return new AmqpConsumer($this->getChannel(), $destination, $this->buffer, $this->receiveMethod); + return new AmqpConsumer($this->getChannel(), $destination, $this->buffer, $this->config['receive_method']); } /** @@ -278,6 +284,14 @@ public function close() } } + /** + * {@inheritdoc} + */ + public function setQos($prefetchSize, $prefetchCount, $global) + { + $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global); + } + /** * @return AMQPChannel */ @@ -285,7 +299,11 @@ private function getChannel() { if (null === $this->channel) { $this->channel = $this->connection->channel(); - $this->channel->basic_qos(0, 1, false); + $this->channel->basic_qos( + $this->config['qos_prefetch_size'], + $this->config['qos_prefetch_count'], + $this->config['qos_global'] + ); } return $this->channel; diff --git a/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php b/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php index 31a1ca0ef..42ef02fbb 100644 --- a/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php +++ b/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php @@ -81,6 +81,9 @@ public static function provideConfigs() 'heartbeat' => 0, 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ], ]; @@ -107,6 +110,9 @@ public static function provideConfigs() 'heartbeat' => 0, 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ], ]; @@ -131,6 +137,9 @@ public static function provideConfigs() 'heartbeat' => 0, 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ], ]; @@ -155,6 +164,9 @@ public static function provideConfigs() 'heartbeat' => 0, 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ], ]; @@ -179,6 +191,9 @@ public static function provideConfigs() 'heartbeat' => 0, 'connection_timeout' => '2', 'read_write_timeout' => 3.0, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ], ]; @@ -203,6 +218,9 @@ public static function provideConfigs() 'heartbeat' => 0, 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ], ]; @@ -227,6 +245,9 @@ public static function provideConfigs() 'heartbeat' => 0, 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ], ]; @@ -251,6 +272,9 @@ public static function provideConfigs() 'heartbeat' => 0, 'connection_timeout' => 123, 'read_write_timeout' => 321, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ], ]; @@ -275,6 +299,9 @@ public static function provideConfigs() 'heartbeat' => 0, 'connection_timeout' => '123', 'read_write_timeout' => '321', + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, ], ]; } diff --git a/pkg/amqp-lib/Tests/AmqpContextTest.php b/pkg/amqp-lib/Tests/AmqpContextTest.php index 939dcfd39..80dee492a 100644 --- a/pkg/amqp-lib/Tests/AmqpContextTest.php +++ b/pkg/amqp-lib/Tests/AmqpContextTest.php @@ -47,7 +47,7 @@ public function testShouldDeclareTopic() $topic->addFlag(AmqpTopic::FLAG_INTERNAL); $topic->addFlag(AmqpTopic::FLAG_AUTODELETE); - $session = new AmqpContext($connection, ''); + $session = new AmqpContext($connection); $session->declareTopic($topic); } @@ -77,7 +77,7 @@ public function testShouldDeleteTopic() $topic->addFlag(AmqpTopic::FLAG_IFUNUSED); $topic->addFlag(AmqpTopic::FLAG_NOWAIT); - $session = new AmqpContext($connection, ''); + $session = new AmqpContext($connection); $session->deleteTopic($topic); } @@ -115,7 +115,7 @@ public function testShouldDeclareQueue() $queue->addFlag(AmqpQueue::FLAG_EXCLUSIVE); $queue->addFlag(AmqpQueue::FLAG_NOWAIT); - $session = new AmqpContext($connection, ''); + $session = new AmqpContext($connection); $session->declareQueue($queue); } @@ -146,7 +146,7 @@ public function testShouldDeleteQueue() $queue->addFlag(AmqpQueue::FLAG_IFEMPTY); $queue->addFlag(AmqpQueue::FLAG_NOWAIT); - $session = new AmqpContext($connection, ''); + $session = new AmqpContext($connection); $session->deleteQueue($queue); } @@ -169,7 +169,7 @@ public function testBindShouldBindTopicToTopic() ->willReturn($channel) ; - $context = new AmqpContext($connection, ''); + $context = new AmqpContext($connection); $context->bind(new AmqpBind($target, $source, 'routing-key', 12345)); } @@ -192,7 +192,7 @@ public function testBindShouldBindTopicToQueue() ->willReturn($channel) ; - $context = new AmqpContext($connection, ''); + $context = new AmqpContext($connection); $context->bind(new AmqpBind($target, $source, 'routing-key', 12345)); $context->bind(new AmqpBind($source, $target, 'routing-key', 12345)); } @@ -216,7 +216,7 @@ public function testShouldUnBindTopicFromTopic() ->willReturn($channel) ; - $context = new AmqpContext($connection, ''); + $context = new AmqpContext($connection); $context->unbind(new AmqpBind($target, $source, 'routing-key', 12345)); } @@ -239,7 +239,7 @@ public function testShouldUnBindTopicFromQueue() ->willReturn($channel) ; - $context = new AmqpContext($connection, ''); + $context = new AmqpContext($connection); $context->unbind(new AmqpBind($target, $source, 'routing-key', 12345, ['key' => 'value'])); $context->unbind(new AmqpBind($source, $target, 'routing-key', 12345, ['key' => 'value'])); } @@ -259,7 +259,7 @@ public function testShouldCloseChannelConnection() ->willReturn($channel) ; - $context = new AmqpContext($connection, ''); + $context = new AmqpContext($connection); $context->createProducer(); $context->close(); @@ -284,10 +284,35 @@ public function testShouldPurgeQueue() ->willReturn($channel) ; - $context = new AmqpContext($connection, ''); + $context = new AmqpContext($connection); $context->purgeQueue($queue); } + public function testShouldSetQos() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->at(0)) + ->method('basic_qos') + ->with($this->identicalTo(0), $this->identicalTo(1), $this->isFalse()) + ; + $channel + ->expects($this->at(1)) + ->method('basic_qos') + ->with($this->identicalTo(123), $this->identicalTo(456), $this->isTrue()) + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $context = new AmqpContext($connection); + $context->setQos(123, 456, true); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|AbstractConnection */ diff --git a/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php b/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php index 087336d5a..5e3d8bb8e 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpContextTest.php @@ -20,6 +20,6 @@ protected function createContext() ->willReturn($channel) ; - return new AmqpContext($con, ''); + return new AmqpContext($con); } }