From d5e2f33333d1dcdc68c88c9b0a0dfba1d1ca642a Mon Sep 17 00:00:00 2001 From: Ian Jenkins Date: Thu, 24 Aug 2017 10:08:05 +0100 Subject: [PATCH] Use Query Builder for better support across platforms. Not all drivers supported FOR UPDATE queries so this changes changes the raw SQL to use the query builder thus allowing DBAL to construct hte appropriate query per platform. It then grabs the SQL and appends the read lock SQL but grabs the SQL part from the correct platform thus as to not break portability. --- pkg/dbal/DbalConsumer.php | 18 ++-- pkg/dbal/Tests/DbalConsumerTest.php | 135 ++++++++++++++++++++++++++++ pkg/redis/RedisConsumer.php | 4 +- 3 files changed, 149 insertions(+), 8 deletions(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 86fe30cc7..5546d9100 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -169,12 +169,18 @@ protected function receiveMessage() try { $now = time(); - $sql = sprintf( - 'SELECT * FROM %s WHERE queue=:queue AND '. - '(delayed_until IS NULL OR delayed_until<=:delayedUntil) '. - 'ORDER BY priority DESC, id ASC LIMIT 1 FOR UPDATE', - $this->context->getTableName() - ); + $query = $this->dbal->createQueryBuilder(); + $query + ->select('*') + ->from($this->context->getTableName()) + ->where('queue = :queue') + ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') + ->orderBy('priority', 'desc') + ->orderBy('id', 'asc') + ->setMaxResults(1) + ; + + $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); $dbalMessage = $this->dbal->executeQuery( $sql, diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index acd0b78a2..3e7695315 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -3,6 +3,8 @@ namespace Enqueue\Dbal\Tests; use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Platforms\AbstractPlatform; +use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Statement; use Enqueue\Dbal\DbalConsumer; use Enqueue\Dbal\DbalContext; @@ -148,7 +150,41 @@ public function testShouldReceiveMessage() ->will($this->returnValue($dbalMessage)) ; + $queryBuilder = $this->createQueryBuilderMock(); + $queryBuilder + ->expects($this->once()) + ->method('select') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->once()) + ->method('from') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->once()) + ->method('where') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->once()) + ->method('andWhere') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->exactly(2)) + ->method('orderBy') + ->will($this->returnSelf()) + ; + + $platform = $this->createPlatformMock(); + $dbal = $this->createConnectionMock(); + $dbal + ->expects($this->once()) + ->method('createQueryBuilder') + ->willReturn($queryBuilder) + ; $dbal ->expects($this->once()) ->method('executeQuery') @@ -163,6 +199,11 @@ public function testShouldReceiveMessage() ->expects($this->once()) ->method('commit') ; + $dbal + ->expects($this->once()) + ->method('getDatabasePlatform') + ->willReturn($platform) + ; $context = $this->createContextMock(); $context @@ -201,7 +242,41 @@ public function testShouldReturnNullIfThereIsNoNewMessage() ->will($this->returnValue(null)) ; + $queryBuilder = $this->createQueryBuilderMock(); + $queryBuilder + ->expects($this->once()) + ->method('select') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->once()) + ->method('from') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->once()) + ->method('where') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->once()) + ->method('andWhere') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->exactly(2)) + ->method('orderBy') + ->will($this->returnSelf()) + ; + + $platform = $this->createPlatformMock(); + $dbal = $this->createConnectionMock(); + $dbal + ->expects($this->once()) + ->method('createQueryBuilder') + ->willReturn($queryBuilder) + ; $dbal ->expects($this->once()) ->method('executeQuery') @@ -216,6 +291,11 @@ public function testShouldReturnNullIfThereIsNoNewMessage() ->expects($this->once()) ->method('commit') ; + $dbal + ->expects($this->once()) + ->method('getDatabasePlatform') + ->willReturn($platform) + ; $context = $this->createContextMock(); $context @@ -250,7 +330,41 @@ public function testShouldThrowIfMessageWasNotRemoved() ->will($this->returnValue(['id' => '2134'])) ; + $queryBuilder = $this->createQueryBuilderMock(); + $queryBuilder + ->expects($this->once()) + ->method('select') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->once()) + ->method('from') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->once()) + ->method('where') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->once()) + ->method('andWhere') + ->will($this->returnSelf()) + ; + $queryBuilder + ->expects($this->exactly(2)) + ->method('orderBy') + ->will($this->returnSelf()) + ; + + $platform = $this->createPlatformMock(); + $dbal = $this->createConnectionMock(); + $dbal + ->expects($this->once()) + ->method('createQueryBuilder') + ->willReturn($queryBuilder) + ; $dbal ->expects($this->once()) ->method('executeQuery') @@ -269,6 +383,11 @@ public function testShouldThrowIfMessageWasNotRemoved() ->expects($this->once()) ->method('rollBack') ; + $dbal + ->expects($this->once()) + ->method('getDatabasePlatform') + ->willReturn($platform) + ; $context = $this->createContextMock(); $context @@ -318,6 +437,22 @@ private function createContextMock() { return $this->createMock(DbalContext::class); } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|QueryBuilder + */ + private function createQueryBuilderMock() + { + return $this->createMock(QueryBuilder::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AbstractPlatform + */ + private function createPlatformMock() + { + return $this->createMock(AbstractPlatform::class); + } } class InvalidMessage implements PsrMessage diff --git a/pkg/redis/RedisConsumer.php b/pkg/redis/RedisConsumer.php index 86a2399ca..6c57f798c 100644 --- a/pkg/redis/RedisConsumer.php +++ b/pkg/redis/RedisConsumer.php @@ -48,8 +48,8 @@ public function receive($timeout = 0) $timeout = (int) ($timeout / 1000); if (empty($timeout)) { // Caused by -// Predis\Response\ServerException: ERR timeout is not an integer or out of range -// /mqdev/vendor/predis/predis/src/Client.php:370 + // Predis\Response\ServerException: ERR timeout is not an integer or out of range + // /mqdev/vendor/predis/predis/src/Client.php:370 return $this->receiveNoWait(); }