From 4acb30b5eaf581ece62365289a0820abc154fbc1 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 4 Jul 2017 22:59:35 +0300 Subject: [PATCH 01/15] [bundle] Extend EventDispatcher instead of container aware one. fixes deprecation note. --- pkg/enqueue-bundle/Events/ProxyEventDispatcher.php | 10 +++------- pkg/enqueue-bundle/Resources/config/events.yml | 1 - .../Tests/Unit/Events/ProxyEventDispatcherTest.php | 14 ++++++-------- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/pkg/enqueue-bundle/Events/ProxyEventDispatcher.php b/pkg/enqueue-bundle/Events/ProxyEventDispatcher.php index d287fb517..e07d226b9 100644 --- a/pkg/enqueue-bundle/Events/ProxyEventDispatcher.php +++ b/pkg/enqueue-bundle/Events/ProxyEventDispatcher.php @@ -2,12 +2,11 @@ namespace Enqueue\Bundle\Events; -use Symfony\Component\DependencyInjection\ContainerInterface; -use Symfony\Component\EventDispatcher\ContainerAwareEventDispatcher; use Symfony\Component\EventDispatcher\Event; +use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcherInterface; -class ProxyEventDispatcher extends ContainerAwareEventDispatcher +class ProxyEventDispatcher extends EventDispatcher { /** * @var EventDispatcherInterface @@ -20,14 +19,11 @@ class ProxyEventDispatcher extends ContainerAwareEventDispatcher private $asyncListener; /** - * @param ContainerInterface $container * @param EventDispatcherInterface $trueEventDispatcher * @param AsyncListener $asyncListener */ - public function __construct(ContainerInterface $container, EventDispatcherInterface $trueEventDispatcher, AsyncListener $asyncListener) + public function __construct(EventDispatcherInterface $trueEventDispatcher, AsyncListener $asyncListener) { - parent::__construct($container); - $this->trueEventDispatcher = $trueEventDispatcher; $this->asyncListener = $asyncListener; } diff --git a/pkg/enqueue-bundle/Resources/config/events.yml b/pkg/enqueue-bundle/Resources/config/events.yml index c35309360..e33976b8c 100644 --- a/pkg/enqueue-bundle/Resources/config/events.yml +++ b/pkg/enqueue-bundle/Resources/config/events.yml @@ -14,7 +14,6 @@ services: enqueue.events.event_dispatcher: class: 'Enqueue\Bundle\Events\ProxyEventDispatcher' arguments: - - '@service_container' - '@event_dispatcher' - '@enqueue.events.async_listener' diff --git a/pkg/enqueue-bundle/Tests/Unit/Events/ProxyEventDispatcherTest.php b/pkg/enqueue-bundle/Tests/Unit/Events/ProxyEventDispatcherTest.php index 5c70b5b49..3698e7d1c 100644 --- a/pkg/enqueue-bundle/Tests/Unit/Events/ProxyEventDispatcherTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/Events/ProxyEventDispatcherTest.php @@ -6,8 +6,6 @@ use Enqueue\Bundle\Events\ProxyEventDispatcher; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; -use Symfony\Component\DependencyInjection\Container; -use Symfony\Component\EventDispatcher\ContainerAwareEventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\GenericEvent; @@ -15,9 +13,9 @@ class ProxyEventDispatcherTest extends TestCase { use ClassExtensionTrait; - public function testShouldBeSubClassOfContainerAwareEventDispatcher() + public function testShouldBeSubClassOfEventDispatcher() { - $this->assertClassExtends(ContainerAwareEventDispatcher::class, ProxyEventDispatcher::class); + $this->assertClassExtends(EventDispatcher::class, ProxyEventDispatcher::class); } public function testShouldSetSyncModeForGivenEventNameOnDispatchAsyncListenersOnly() @@ -34,7 +32,7 @@ public function testShouldSetSyncModeForGivenEventNameOnDispatchAsyncListenersOn ; $trueEventDispatcher = new EventDispatcher(); - $dispatcher = new ProxyEventDispatcher(new Container(), $trueEventDispatcher, $asyncListenerMock); + $dispatcher = new ProxyEventDispatcher($trueEventDispatcher, $asyncListenerMock); $event = new GenericEvent(); $dispatcher->dispatchAsyncListenersOnly('theEvent', $event); @@ -51,7 +49,7 @@ public function testShouldCallAsyncEventButNotOtherOnDispatchAsyncListenersOnly( }); $asyncEventWasCalled = false; - $dispatcher = new ProxyEventDispatcher(new Container(), $trueEventDispatcher, $this->createAsyncLisenerMock()); + $dispatcher = new ProxyEventDispatcher($trueEventDispatcher, $this->createAsyncLisenerMock()); $dispatcher->addListener('theEvent', function () use (&$asyncEventWasCalled) { $this->assertInstanceOf(ProxyEventDispatcher::class, func_get_arg(2)); @@ -76,7 +74,7 @@ public function testShouldCallOtherEventIfDispatchedFromAsyncEventOnDispatchAsyn }); $asyncEventWasCalled = false; - $dispatcher = new ProxyEventDispatcher(new Container(), $trueEventDispatcher, $this->createAsyncLisenerMock()); + $dispatcher = new ProxyEventDispatcher($trueEventDispatcher, $this->createAsyncLisenerMock()); $dispatcher->addListener('theEvent', function () use (&$asyncEventWasCalled) { $this->assertInstanceOf(ProxyEventDispatcher::class, func_get_arg(2)); @@ -99,7 +97,7 @@ public function testShouldNotCallAsyncEventIfDispatchedFromOtherEventOnDispatchA func_get_arg(2)->dispatch('theOtherAsyncEvent'); }); - $dispatcher = new ProxyEventDispatcher(new Container(), $trueEventDispatcher, $this->createAsyncLisenerMock()); + $dispatcher = new ProxyEventDispatcher($trueEventDispatcher, $this->createAsyncLisenerMock()); $dispatcher->addListener('theAsyncEvent', function () { func_get_arg(2)->dispatch('theOtherEvent'); }); From 4b3b55418a8c7879ef9e1106df24153cb5ff1e6e Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 10:21:03 +0300 Subject: [PATCH 02/15] Async event dispatcher pkg. --- composer.json | 5 ++ docs/bundle/async_events.md | 8 +-- phpunit.xml.dist | 4 ++ pkg/async-event-dispatcher/.gitignore | 6 ++ pkg/async-event-dispatcher/.travis.yml | 21 +++++++ .../AsyncListener.php | 2 +- .../AsyncProcessor.php | 2 +- .../ContainerAwareRegistry.php | 2 +- .../AsyncEventDispatcherExtension.php | 32 ++++++++++ .../DependencyInjection/AsyncEventsPass.php | 2 +- .../AsyncTransformersPass.php | 2 +- .../EventTransformer.php | 2 +- pkg/async-event-dispatcher/LICENSE | 19 ++++++ .../OldProxyEventDispatcher.php | 61 +++++++++++++++++++ .../PhpSerializerEventTransformer.php | 2 +- .../ProxyEventDispatcher.php | 2 +- pkg/async-event-dispatcher/README.md | 27 ++++++++ .../Registry.php | 2 +- .../Resources/config/services.yml} | 10 +-- .../Tests}/AsyncListenerTest.php | 8 +-- .../Tests}/AsyncProcessorTest.php | 10 +-- .../Tests}/ContainerAwareRegistryTest.php | 10 +-- .../PhpSerializerEventTransformerTest.php | 6 +- .../Tests}/ProxyEventDispatcherTest.php | 6 +- pkg/async-event-dispatcher/composer.json | 40 ++++++++++++ pkg/async-event-dispatcher/phpunit.xml.dist | 31 ++++++++++ .../DependencyInjection/EnqueueExtension.php | 4 +- pkg/enqueue-bundle/EnqueueBundle.php | 4 +- .../App/TestAsyncEventTransformer.php | 2 +- .../Functional/Events/AsyncListenerTest.php | 2 +- .../Functional/Events/AsyncProcessorTest.php | 4 +- .../Functional/Events/AsyncSubscriberTest.php | 2 +- pkg/enqueue-bundle/composer.json | 3 +- 33 files changed, 296 insertions(+), 47 deletions(-) create mode 100644 pkg/async-event-dispatcher/.gitignore create mode 100644 pkg/async-event-dispatcher/.travis.yml rename pkg/{enqueue-bundle/Events => async-event-dispatcher}/AsyncListener.php (97%) rename pkg/{enqueue-bundle/Events => async-event-dispatcher}/AsyncProcessor.php (97%) rename pkg/{enqueue-bundle/Events => async-event-dispatcher}/ContainerAwareRegistry.php (98%) create mode 100644 pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php rename pkg/{enqueue-bundle/Events => async-event-dispatcher}/DependencyInjection/AsyncEventsPass.php (98%) rename pkg/{enqueue-bundle/Events => async-event-dispatcher}/DependencyInjection/AsyncTransformersPass.php (95%) rename pkg/{enqueue-bundle/Events => async-event-dispatcher}/EventTransformer.php (94%) create mode 100644 pkg/async-event-dispatcher/LICENSE create mode 100644 pkg/async-event-dispatcher/OldProxyEventDispatcher.php rename pkg/{enqueue-bundle/Events => async-event-dispatcher}/PhpSerializerEventTransformer.php (96%) rename pkg/{enqueue-bundle/Events => async-event-dispatcher}/ProxyEventDispatcher.php (97%) create mode 100644 pkg/async-event-dispatcher/README.md rename pkg/{enqueue-bundle/Events => async-event-dispatcher}/Registry.php (88%) rename pkg/{enqueue-bundle/Resources/config/events.yml => async-event-dispatcher/Resources/config/services.yml} (68%) rename pkg/{enqueue-bundle/Tests/Unit/Events => async-event-dispatcher/Tests}/AsyncListenerTest.php (94%) rename pkg/{enqueue-bundle/Tests/Unit/Events => async-event-dispatcher/Tests}/AsyncProcessorTest.php (93%) rename pkg/{enqueue-bundle/Tests/Unit/Events => async-event-dispatcher/Tests}/ContainerAwareRegistryTest.php (92%) rename pkg/{enqueue-bundle/Tests/Unit/Events => async-event-dispatcher/Tests}/PhpSerializerEventTransformerTest.php (94%) rename pkg/{enqueue-bundle/Tests/Unit/Events => async-event-dispatcher/Tests}/ProxyEventDispatcherTest.php (96%) create mode 100644 pkg/async-event-dispatcher/composer.json create mode 100644 pkg/async-event-dispatcher/phpunit.xml.dist diff --git a/composer.json b/composer.json index 1c557ec8c..729bed129 100644 --- a/composer.json +++ b/composer.json @@ -19,6 +19,7 @@ "enqueue/job-queue": "*@dev", "enqueue/simple-client": "*@dev", "enqueue/test": "*@dev", + "enqueue/async-event-dispatcher": "*@dev", "phpunit/phpunit": "^5", "doctrine/doctrine-bundle": "~1.2", @@ -98,6 +99,10 @@ { "type": "path", "url": "pkg/simple-client" + }, + { + "type": "path", + "url": "pkg/async-event-dispatcher" } ] } diff --git a/docs/bundle/async_events.md b/docs/bundle/async_events.md index 89787c73b..f71e2ef3a 100644 --- a/docs/bundle/async_events.md +++ b/docs/bundle/async_events.md @@ -66,7 +66,7 @@ You can also add an async listener directly and register a custom message proces services: acme.async_foo_listener: - class: 'Enqueue\Bundle\Events\AsyncListener' + class: 'Enqueue\AsyncEventDispatcher\AsyncListener' public: false arguments: ['@enqueue.client.producer', '@enqueue.events.registry'] tags: @@ -78,7 +78,7 @@ The message processor must subscribe to `event.foo` topic. The message queue top ```php pkg/simple-client/Tests + + + pkg/async-event-dispatcher/Tests + diff --git a/pkg/async-event-dispatcher/.gitignore b/pkg/async-event-dispatcher/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/async-event-dispatcher/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/async-event-dispatcher/.travis.yml b/pkg/async-event-dispatcher/.travis.yml new file mode 100644 index 000000000..aaa1849c3 --- /dev/null +++ b/pkg/async-event-dispatcher/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 1 + +language: php + +php: + - '5.6' + - '7.0' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install --prefer-source --ignore-platform-reqs + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/enqueue-bundle/Events/AsyncListener.php b/pkg/async-event-dispatcher/AsyncListener.php similarity index 97% rename from pkg/enqueue-bundle/Events/AsyncListener.php rename to pkg/async-event-dispatcher/AsyncListener.php index ae4f5b242..ce075b49a 100644 --- a/pkg/enqueue-bundle/Events/AsyncListener.php +++ b/pkg/async-event-dispatcher/AsyncListener.php @@ -1,6 +1,6 @@ load('services.yml'); + + if (version_compare(Kernel::VERSION, '3.3', '<')) { + $container->setDefinition('enqueue.events.async_processor', new Definition(OldProxyEventDispatcher::class, [ + new Reference('service_container'), + new Reference('enqueue.events.registry'), + new Reference('enqueue.events.event_dispatcher'), + ])); + } + } +} diff --git a/pkg/enqueue-bundle/Events/DependencyInjection/AsyncEventsPass.php b/pkg/async-event-dispatcher/DependencyInjection/AsyncEventsPass.php similarity index 98% rename from pkg/enqueue-bundle/Events/DependencyInjection/AsyncEventsPass.php rename to pkg/async-event-dispatcher/DependencyInjection/AsyncEventsPass.php index f02c041c6..1db803539 100644 --- a/pkg/enqueue-bundle/Events/DependencyInjection/AsyncEventsPass.php +++ b/pkg/async-event-dispatcher/DependencyInjection/AsyncEventsPass.php @@ -1,6 +1,6 @@ trueEventDispatcher = $trueEventDispatcher; + $this->asyncListener = $asyncListener; + } + + /** + * This method dispatches only those listeners that were marked as async. + * + * @param string $eventName + * @param Event|null $event + */ + public function dispatchAsyncListenersOnly($eventName, Event $event = null) + { + try { + $this->asyncListener->syncMode($eventName); + + parent::dispatch($eventName, $event); + } finally { + $this->asyncListener->resetSyncMode(); + } + } + + /** + * {@inheritdoc} + */ + public function dispatch($eventName, Event $event = null) + { + parent::dispatch($eventName, $event); + + $this->trueEventDispatcher->dispatch($eventName, $event); + } +} diff --git a/pkg/enqueue-bundle/Events/PhpSerializerEventTransformer.php b/pkg/async-event-dispatcher/PhpSerializerEventTransformer.php similarity index 96% rename from pkg/enqueue-bundle/Events/PhpSerializerEventTransformer.php rename to pkg/async-event-dispatcher/PhpSerializerEventTransformer.php index 1625f0fdc..91bf0e27d 100644 --- a/pkg/enqueue-bundle/Events/PhpSerializerEventTransformer.php +++ b/pkg/async-event-dispatcher/PhpSerializerEventTransformer.php @@ -1,6 +1,6 @@ setContainer($container); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The container must return instance of Enqueue\Bundle\Events\EventTransformer but got stdClass'); + $this->expectExceptionMessage('The container must return instance of Enqueue\AsyncEventDispatcher\EventTransformer but got stdClass'); $registry->getTransformer('fooTrans'); } diff --git a/pkg/enqueue-bundle/Tests/Unit/Events/PhpSerializerEventTransformerTest.php b/pkg/async-event-dispatcher/Tests/PhpSerializerEventTransformerTest.php similarity index 94% rename from pkg/enqueue-bundle/Tests/Unit/Events/PhpSerializerEventTransformerTest.php rename to pkg/async-event-dispatcher/Tests/PhpSerializerEventTransformerTest.php index 5556d552d..5b364fab3 100644 --- a/pkg/enqueue-bundle/Tests/Unit/Events/PhpSerializerEventTransformerTest.php +++ b/pkg/async-event-dispatcher/Tests/PhpSerializerEventTransformerTest.php @@ -1,9 +1,9 @@ =5.6", + "enqueue/psr-queue": "^0.5@dev", + "symfony/event-dispatcher": "^2.8|^3" + }, + "require-dev": { + "phpunit/phpunit": "~5.5", + "symfony/dependency-injection": "^2.8|^3", + "symfony/config": "^2.8|^3", + "symfony/http-kernel": "^2.8|^3", + "enqueue/null": "^0.5@dev" + }, + "suggest": { + "symfony/dependency-injection": "^2.8|^3 If you'd like to use async event dispatcher container extension." + }, + "autoload": { + "psr-4": { "Enqueue\\AsyncEventDispatcher\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.5.x-dev" + } + } +} diff --git a/pkg/async-event-dispatcher/phpunit.xml.dist b/pkg/async-event-dispatcher/phpunit.xml.dist new file mode 100644 index 000000000..e64c86d98 --- /dev/null +++ b/pkg/async-event-dispatcher/phpunit.xml.dist @@ -0,0 +1,31 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Resources + ./Tests + + + + diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 6bd3ce72a..67c950ddd 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -2,6 +2,7 @@ namespace Enqueue\Bundle\DependencyInjection; +use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension; use Enqueue\Client\TraceableProducer; use Enqueue\JobQueue\Job; use Enqueue\Null\Symfony\NullTransportFactory; @@ -118,7 +119,8 @@ public function load(array $configs, ContainerBuilder $container) } if (isset($config['async_events']['enabled'])) { - $loader->load('events.yml'); + $extension = new AsyncEventDispatcherExtension(); + $extension->load([], $container); if (false == empty($config['async_events']['spool_producer'])) { $container->getDefinition('enqueue.events.async_listener') diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index c047e77ac..3ce3480d1 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -5,6 +5,8 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory; +use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventsPass; +use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncTransformersPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass; @@ -13,8 +15,6 @@ use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; use Enqueue\Bundle\DependencyInjection\EnqueueExtension; -use Enqueue\Bundle\Events\DependencyInjection\AsyncEventsPass; -use Enqueue\Bundle\Events\DependencyInjection\AsyncTransformersPass; use Enqueue\Dbal\DbalContext; use Enqueue\Dbal\Symfony\DbalTransportFactory; use Enqueue\Fs\FsContext; diff --git a/pkg/enqueue-bundle/Tests/Functional/App/TestAsyncEventTransformer.php b/pkg/enqueue-bundle/Tests/Functional/App/TestAsyncEventTransformer.php index b7ecb1930..6dab2c6fc 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/TestAsyncEventTransformer.php +++ b/pkg/enqueue-bundle/Tests/Functional/App/TestAsyncEventTransformer.php @@ -2,7 +2,7 @@ namespace Enqueue\Bundle\Tests\Functional\App; -use Enqueue\Bundle\Events\EventTransformer; +use Enqueue\AsyncEventDispatcher\EventTransformer; use Enqueue\Client\Message; use Enqueue\Psr\PsrMessage; use Enqueue\Util\JSON; diff --git a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php index a59b765d9..09d562862 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php @@ -2,7 +2,7 @@ namespace Enqueue\Bundle\Tests\Functional\Events; -use Enqueue\Bundle\Events\AsyncListener; +use Enqueue\AsyncEventDispatcher\AsyncListener; use Enqueue\Bundle\Tests\Functional\App\TestAsyncListener; use Enqueue\Bundle\Tests\Functional\WebTestCase; use Enqueue\Client\TraceableProducer; diff --git a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php index fc95843ec..95d611425 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php @@ -2,8 +2,8 @@ namespace Enqueue\Bundle\Tests\Functional\Events; -use Enqueue\Bundle\Events\AsyncListener; -use Enqueue\Bundle\Events\AsyncProcessor; +use Enqueue\AsyncEventDispatcher\AsyncListener; +use Enqueue\AsyncEventDispatcher\AsyncProcessor; use Enqueue\Bundle\Tests\Functional\App\TestAsyncListener; use Enqueue\Bundle\Tests\Functional\App\TestAsyncSubscriber; use Enqueue\Bundle\Tests\Functional\WebTestCase; diff --git a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php index e7dd3d8cb..39d32ab87 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php @@ -2,7 +2,7 @@ namespace Enqueue\Bundle\Tests\Functional\Events; -use Enqueue\Bundle\Events\AsyncListener; +use Enqueue\AsyncEventDispatcher\AsyncListener; use Enqueue\Bundle\Tests\Functional\App\TestAsyncListener; use Enqueue\Bundle\Tests\Functional\WebTestCase; use Enqueue\Client\TraceableProducer; diff --git a/pkg/enqueue-bundle/composer.json b/pkg/enqueue-bundle/composer.json index 3e120ece2..3088bd208 100644 --- a/pkg/enqueue-bundle/composer.json +++ b/pkg/enqueue-bundle/composer.json @@ -14,7 +14,8 @@ "php": ">=5.6", "symfony/framework-bundle": "^2.8|^3", "enqueue/enqueue": "^0.5@dev", - "enqueue/null": "^0.5@dev" + "enqueue/null": "^0.5@dev", + "enqueue/async-event-dispatcher": "^0.5@dev" }, "require-dev": { "phpunit/phpunit": "~5.5", From 1bab448d4627a8ddec3711a5fffe0908b5d2e5fe Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 10:31:09 +0300 Subject: [PATCH 03/15] fix --- .../DependencyInjection/AsyncEventDispatcherExtension.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php b/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php index d3a2617af..928d91ffc 100644 --- a/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php +++ b/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php @@ -22,7 +22,7 @@ public function load(array $configs, ContainerBuilder $container) $loader->load('services.yml'); if (version_compare(Kernel::VERSION, '3.3', '<')) { - $container->setDefinition('enqueue.events.async_processor', new Definition(OldProxyEventDispatcher::class, [ + $container->setDefinition('enqueue.events.event_dispatcher', new Definition(OldProxyEventDispatcher::class, [ new Reference('service_container'), new Reference('enqueue.events.registry'), new Reference('enqueue.events.event_dispatcher'), From bcfed1576cbb5ca4d0bc2775fd076bad40c76fcf Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 10:38:40 +0300 Subject: [PATCH 04/15] fix --- .../DependencyInjection/AsyncEventDispatcherExtension.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php b/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php index 928d91ffc..02a66429a 100644 --- a/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php +++ b/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php @@ -24,8 +24,8 @@ public function load(array $configs, ContainerBuilder $container) if (version_compare(Kernel::VERSION, '3.3', '<')) { $container->setDefinition('enqueue.events.event_dispatcher', new Definition(OldProxyEventDispatcher::class, [ new Reference('service_container'), - new Reference('enqueue.events.registry'), - new Reference('enqueue.events.event_dispatcher'), + new Reference('event_dispatcher'), + new Reference('enqueue.events.async_listener'), ])); } } From 08a4378b84e2f612835968ff2e393d17927e4e93 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 16:06:47 +0300 Subject: [PATCH 05/15] [async-event] Move async event classes to own package. --- pkg/async-event-dispatcher/.gitignore | 1 + ...ispatcher.php => AsyncEventDispatcher.php} | 2 +- pkg/async-event-dispatcher/AsyncListener.php | 36 +++- pkg/async-event-dispatcher/AsyncProcessor.php | 25 ++- .../AsyncEventDispatcherExtension.php | 8 +- .../DependencyInjection/Configuration.php | 24 +++ .../EventTransformer.php | 11 +- ...atcher.php => OldAsyncEventDispatcher.php} | 2 +- .../PhpSerializerEventTransformer.php | 31 ++- .../Resources/config/services.yml | 22 +- pkg/async-event-dispatcher/SimpleRegistry.php | 77 +++++++ .../Tests/AsyncListenerTest.php | 73 +++++-- .../Tests/AsyncProcessorTest.php | 6 +- .../Tests/Functional/UseCasesTest.php | 199 ++++++++++++++++++ .../PhpSerializerEventTransformerTest.php | 32 ++- .../Tests/ProxyEventDispatcherTest.php | 20 +- .../Tests/SimpleRegistryTest.php | 97 +++++++++ pkg/async-event-dispatcher/composer.json | 1 + .../DependencyInjection/Configuration.php | 3 - .../DependencyInjection/EnqueueExtension.php | 10 +- .../Tests/Functional/App/AsyncListener.php | 50 +++++ .../App/TestAsyncEventTransformer.php | 32 ++- .../Tests/Functional/App/config/config.yml | 7 + .../Functional/Events/AsyncListenerTest.php | 12 +- .../Functional/Events/AsyncSubscriberTest.php | 8 +- 25 files changed, 693 insertions(+), 96 deletions(-) rename pkg/async-event-dispatcher/{ProxyEventDispatcher.php => AsyncEventDispatcher.php} (96%) create mode 100644 pkg/async-event-dispatcher/DependencyInjection/Configuration.php rename pkg/async-event-dispatcher/{OldProxyEventDispatcher.php => OldAsyncEventDispatcher.php} (96%) create mode 100644 pkg/async-event-dispatcher/SimpleRegistry.php create mode 100644 pkg/async-event-dispatcher/Tests/Functional/UseCasesTest.php create mode 100644 pkg/async-event-dispatcher/Tests/SimpleRegistryTest.php create mode 100644 pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php diff --git a/pkg/async-event-dispatcher/.gitignore b/pkg/async-event-dispatcher/.gitignore index a770439e5..243f6687a 100644 --- a/pkg/async-event-dispatcher/.gitignore +++ b/pkg/async-event-dispatcher/.gitignore @@ -4,3 +4,4 @@ /phpunit.xml /vendor/ /.idea/ +Tests/Functional/queues \ No newline at end of file diff --git a/pkg/async-event-dispatcher/ProxyEventDispatcher.php b/pkg/async-event-dispatcher/AsyncEventDispatcher.php similarity index 96% rename from pkg/async-event-dispatcher/ProxyEventDispatcher.php rename to pkg/async-event-dispatcher/AsyncEventDispatcher.php index 20f145827..c74d9745b 100644 --- a/pkg/async-event-dispatcher/ProxyEventDispatcher.php +++ b/pkg/async-event-dispatcher/AsyncEventDispatcher.php @@ -6,7 +6,7 @@ use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcherInterface; -class ProxyEventDispatcher extends EventDispatcher +class AsyncEventDispatcher extends EventDispatcher { /** * @var EventDispatcherInterface diff --git a/pkg/async-event-dispatcher/AsyncListener.php b/pkg/async-event-dispatcher/AsyncListener.php index ce075b49a..f7b3cf1b8 100644 --- a/pkg/async-event-dispatcher/AsyncListener.php +++ b/pkg/async-event-dispatcher/AsyncListener.php @@ -2,35 +2,42 @@ namespace Enqueue\AsyncEventDispatcher; -use Enqueue\Client\Message; -use Enqueue\Client\ProducerInterface; +use Enqueue\Psr\PsrContext; +use Enqueue\Psr\PsrQueue; use Symfony\Component\EventDispatcher\Event; class AsyncListener { /** - * @var ProducerInterface + * @var PsrContext */ - private $producer; + private $context; /** * @var Registry */ private $registry; + /** + * @var PsrQueue + */ + private $eventQueue; + /** * @var bool */ private $syncMode; /** - * @param ProducerInterface $producer - * @param Registry $registry + * @param PsrContext $context + * @param Registry $registry + * @param PsrQueue|string $eventQueue */ - public function __construct(ProducerInterface $producer, Registry $registry) + public function __construct(PsrContext $context, Registry $registry, $eventQueue) { - $this->producer = $producer; + $this->context = $context; $this->registry = $registry; + $this->eventQueue = $eventQueue instanceof PsrQueue ? $eventQueue : $context->createQueue($eventQueue); } public function resetSyncMode() @@ -46,6 +53,16 @@ public function syncMode($eventName) $this->syncMode[$eventName] = true; } + /** + * @param string $eventName + * + * @return bool + */ + public function isSyncMode($eventName) + { + return isset($this->syncMode[$eventName]); + } + /** * @param Event $event * @param string $eventName @@ -56,11 +73,10 @@ public function onEvent(Event $event = null, $eventName) $transformerName = $this->registry->getTransformerNameForEvent($eventName); $message = $this->registry->getTransformer($transformerName)->toMessage($eventName, $event); - $message->setScope(Message::SCOPE_APP); $message->setProperty('event_name', $eventName); $message->setProperty('transformer_name', $transformerName); - $this->producer->sendEvent('event.'.$eventName, $message); + $this->context->createProducer()->send($this->eventQueue, $message); } } } diff --git a/pkg/async-event-dispatcher/AsyncProcessor.php b/pkg/async-event-dispatcher/AsyncProcessor.php index edffb20a0..1343bdac7 100644 --- a/pkg/async-event-dispatcher/AsyncProcessor.php +++ b/pkg/async-event-dispatcher/AsyncProcessor.php @@ -6,6 +6,7 @@ use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Psr\PsrProcessor; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; class AsyncProcessor implements PsrProcessor { @@ -15,18 +16,28 @@ class AsyncProcessor implements PsrProcessor private $registry; /** - * @var ProxyEventDispatcher + * @var AsyncEventDispatcher|OldAsyncEventDispatcher */ - private $eventDispatcher; + private $dispatcher; /** - * @param Registry $registry - * @param ProxyEventDispatcher $eventDispatcher + * @param Registry $registry + * @param EventDispatcherInterface $dispatcher */ - public function __construct(Registry $registry, ProxyEventDispatcher $eventDispatcher) + public function __construct(Registry $registry, EventDispatcherInterface $dispatcher) { $this->registry = $registry; - $this->eventDispatcher = $eventDispatcher; + + if (false == ($dispatcher instanceof AsyncEventDispatcher || $dispatcher instanceof OldAsyncEventDispatcher)) { + throw new \InvalidArgumentException(sprintf( + 'The dispatcher argument must be either instance of "%s" or "%s" but got "%s"', + AsyncEventDispatcher::class, + OldAsyncEventDispatcher::class, + get_class($dispatcher) + )); + } + + $this->dispatcher = $dispatcher; } /** @@ -43,7 +54,7 @@ public function process(PsrMessage $message, PsrContext $context) $event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message); - $this->eventDispatcher->dispatchAsyncListenersOnly($eventName, $event); + $this->dispatcher->dispatchAsyncListenersOnly($eventName, $event); return self::ACK; } diff --git a/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php b/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php index 02a66429a..160999d03 100644 --- a/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php +++ b/pkg/async-event-dispatcher/DependencyInjection/AsyncEventDispatcherExtension.php @@ -2,7 +2,7 @@ namespace Enqueue\AsyncEventDispatcher\DependencyInjection; -use Enqueue\AsyncEventDispatcher\OldProxyEventDispatcher; +use Enqueue\AsyncEventDispatcher\OldAsyncEventDispatcher; use Symfony\Component\Config\FileLocator; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Definition; @@ -18,11 +18,15 @@ class AsyncEventDispatcherExtension extends Extension */ public function load(array $configs, ContainerBuilder $container) { + $config = $this->processConfiguration(new Configuration(), $configs); + + $container->setAlias('enqueue.events.context', $config['context_service']); + $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); $loader->load('services.yml'); if (version_compare(Kernel::VERSION, '3.3', '<')) { - $container->setDefinition('enqueue.events.event_dispatcher', new Definition(OldProxyEventDispatcher::class, [ + $container->setDefinition('enqueue.events.event_dispatcher', new Definition(OldAsyncEventDispatcher::class, [ new Reference('service_container'), new Reference('event_dispatcher'), new Reference('enqueue.events.async_listener'), diff --git a/pkg/async-event-dispatcher/DependencyInjection/Configuration.php b/pkg/async-event-dispatcher/DependencyInjection/Configuration.php new file mode 100644 index 000000000..9703e6284 --- /dev/null +++ b/pkg/async-event-dispatcher/DependencyInjection/Configuration.php @@ -0,0 +1,24 @@ +root('enqueue_async_event_dispatcher'); + + $rootNode->children() + ->scalarNode('context_service')->isRequired()->cannotBeEmpty()->end() + ; + + return $tb; + } +} diff --git a/pkg/async-event-dispatcher/EventTransformer.php b/pkg/async-event-dispatcher/EventTransformer.php index c4eaf07c5..eb9af7caf 100644 --- a/pkg/async-event-dispatcher/EventTransformer.php +++ b/pkg/async-event-dispatcher/EventTransformer.php @@ -2,8 +2,6 @@ namespace Enqueue\AsyncEventDispatcher; -use Enqueue\Client\Message; -use Enqueue\Consumption\Result; use Enqueue\Psr\PsrMessage; use Symfony\Component\EventDispatcher\Event; @@ -13,18 +11,19 @@ interface EventTransformer * @param string $eventName * @param Event|null $event * - * @return Message + * @return PsrMessage */ - public function toMessage($eventName, Event $event = null); + public function toMessage($eventName, Event $event); /** * If you able to transform message back to event return it. - * If you failed to transform for some reason you can return instance of Result object ( Like this Result::reject() );. + * If you failed to transform for some reason you can return a string status (@see PsrProcess constants) or an object that implements __toString method. + * The object must have a __toString method is supposed to be used as PsrProcessor::process return value. * * @param string $eventName * @param PsrMessage $message * - * @return Event|Result|null + * @return Event|string|object */ public function toEvent($eventName, PsrMessage $message); } diff --git a/pkg/async-event-dispatcher/OldProxyEventDispatcher.php b/pkg/async-event-dispatcher/OldAsyncEventDispatcher.php similarity index 96% rename from pkg/async-event-dispatcher/OldProxyEventDispatcher.php rename to pkg/async-event-dispatcher/OldAsyncEventDispatcher.php index 87731d9c8..97720af08 100644 --- a/pkg/async-event-dispatcher/OldProxyEventDispatcher.php +++ b/pkg/async-event-dispatcher/OldAsyncEventDispatcher.php @@ -7,7 +7,7 @@ use Symfony\Component\EventDispatcher\Event; use Symfony\Component\EventDispatcher\EventDispatcherInterface; -class OldProxyEventDispatcher extends ContainerAwareEventDispatcher +class OldAsyncEventDispatcher extends ContainerAwareEventDispatcher { /** * @var EventDispatcherInterface diff --git a/pkg/async-event-dispatcher/PhpSerializerEventTransformer.php b/pkg/async-event-dispatcher/PhpSerializerEventTransformer.php index 91bf0e27d..03f9027da 100644 --- a/pkg/async-event-dispatcher/PhpSerializerEventTransformer.php +++ b/pkg/async-event-dispatcher/PhpSerializerEventTransformer.php @@ -2,13 +2,33 @@ namespace Enqueue\AsyncEventDispatcher; -use Enqueue\Client\Message; +use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Symfony\Component\EventDispatcher\Event; use Symfony\Component\HttpKernel\Kernel; class PhpSerializerEventTransformer implements EventTransformer { + /** + * @var PsrContext + */ + private $context; + + /** + * @var bool + */ + private $skipSymfonyVersionCheck; + + /** + * @param PsrContext $context + * @param bool $skipSymfonyVersionCheck It is useful when async dispatcher is used without Kernel. So there is no way to check the version. + */ + public function __construct(PsrContext $context, $skipSymfonyVersionCheck = false) + { + $this->context = $context; + $this->skipSymfonyVersionCheck = $skipSymfonyVersionCheck; + } + /** * {@inheritdoc} */ @@ -16,10 +36,7 @@ public function toMessage($eventName, Event $event = null) { $this->assertSymfony30OrHigher(); - $message = new Message(); - $message->setBody(serialize($event)); - - return $message; + return $this->context->createMessage(serialize($event)); } /** @@ -34,6 +51,10 @@ public function toEvent($eventName, PsrMessage $message) private function assertSymfony30OrHigher() { + if ($this->skipSymfonyVersionCheck) { + return; + } + if (version_compare(Kernel::VERSION, '3.0', '<')) { throw new \LogicException( 'This transformer does not work on Symfony prior 3.0. '. diff --git a/pkg/async-event-dispatcher/Resources/config/services.yml b/pkg/async-event-dispatcher/Resources/config/services.yml index 3fe8bfaf9..94dd698e9 100644 --- a/pkg/async-event-dispatcher/Resources/config/services.yml +++ b/pkg/async-event-dispatcher/Resources/config/services.yml @@ -1,4 +1,10 @@ +parameters: + enqueue_events_queue: 'symfony_events' + services: + # should be defined by the extension + # enqueue.events.context: + enqueue.events.registry: class: 'Enqueue\AsyncEventDispatcher\ContainerAwareRegistry' public: false @@ -8,11 +14,11 @@ services: enqueue.events.async_listener: class: 'Enqueue\AsyncEventDispatcher\AsyncListener' - arguments: ['@enqueue.client.producer', '@enqueue.events.registry'] + arguments: ['@enqueue.events.context', '@enqueue.events.registry', '%enqueue_events_queue%'] enqueue.events.event_dispatcher: - class: 'Enqueue\AsyncEventDispatcher\ProxyEventDispatcher' + class: 'Enqueue\AsyncEventDispatcher\AsyncEventDispatcher' arguments: - '@event_dispatcher' - '@enqueue.events.async_listener' @@ -22,8 +28,18 @@ services: arguments: - '@enqueue.events.registry' - '@enqueue.events.event_dispatcher' + tags: + - + name: 'enqueue.client.processor' + topicName: '__command__' + processorName: '%enqueue_events_queue%' + queueName: '%enqueue_events_queue%' + queueNameHardcoded: true + exclusive: true enqueue.events.php_serializer_event_transofrmer: class: 'Enqueue\AsyncEventDispatcher\PhpSerializerEventTransformer' + arguments: + - '@enqueue.events.context' tags: - - {name: 'enqueue.event_transformer', eventName: '/.*/', transformerName: 'php_serializer' } \ No newline at end of file + - {name: 'enqueue.event_transformer', eventName: '/.*/', transformerName: 'php_serializer' } diff --git a/pkg/async-event-dispatcher/SimpleRegistry.php b/pkg/async-event-dispatcher/SimpleRegistry.php new file mode 100644 index 000000000..2f39d0cac --- /dev/null +++ b/pkg/async-event-dispatcher/SimpleRegistry.php @@ -0,0 +1,77 @@ + transformerName] + * @param string[] $transformersMap [transformerName => transformerObject] + */ + public function __construct(array $eventsMap, array $transformersMap) + { + $this->eventsMap = $eventsMap; + $this->transformersMap = $transformersMap; + } + + /** + * {@inheritdoc} + */ + public function getTransformerNameForEvent($eventName) + { + $transformerName = null; + if (array_key_exists($eventName, $this->eventsMap)) { + $transformerName = $this->eventsMap[$eventName]; + } else { + foreach ($this->eventsMap as $eventNamePattern => $name) { + if ('/' != $eventNamePattern[0]) { + continue; + } + + if (preg_match($eventNamePattern, $eventName)) { + $transformerName = $name; + + break; + } + } + } + + if (empty($transformerName)) { + throw new \LogicException(sprintf('There is no transformer registered for the given event %s', $eventName)); + } + + return $transformerName; + } + + /** + * {@inheritdoc} + */ + public function getTransformer($name) + { + if (false == array_key_exists($name, $this->transformersMap)) { + throw new \LogicException(sprintf('There is no transformer named %s', $name)); + } + + $transformer = $this->transformersMap[$name]; + + if (false == $transformer instanceof EventTransformer) { + throw new \LogicException(sprintf( + 'The container must return instance of %s but got %s', + EventTransformer::class, + is_object($transformer) ? get_class($transformer) : gettype($transformer) + )); + } + + return $transformer; + } +} diff --git a/pkg/async-event-dispatcher/Tests/AsyncListenerTest.php b/pkg/async-event-dispatcher/Tests/AsyncListenerTest.php index 2d2481b18..5fa73dbb2 100644 --- a/pkg/async-event-dispatcher/Tests/AsyncListenerTest.php +++ b/pkg/async-event-dispatcher/Tests/AsyncListenerTest.php @@ -5,8 +5,10 @@ use Enqueue\AsyncEventDispatcher\AsyncListener; use Enqueue\AsyncEventDispatcher\EventTransformer; use Enqueue\AsyncEventDispatcher\Registry; -use Enqueue\Client\Message; -use Enqueue\Client\ProducerInterface; +use Enqueue\Null\NullMessage; +use Enqueue\Null\NullQueue; +use Enqueue\Psr\PsrContext; +use Enqueue\Psr\PsrProducer; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; use Symfony\Component\EventDispatcher\GenericEvent; @@ -15,17 +17,44 @@ class AsyncListenerTest extends TestCase { use ClassExtensionTrait; - public function testCouldBeConstructedWithRegistryAndProxyEventDispatcher() + public function testCouldBeConstructedWithContextAndRegistryAndEventQueueAsString() { - new AsyncListener($this->createProducerMock(), $this->createRegistryMock()); + $eventQueue = new NullQueue('symfony_events'); + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('symfony_events') + ->willReturn($eventQueue) + ; + + $listener = new AsyncListener($context, $this->createRegistryMock(), 'symfony_events'); + + $this->assertAttributeSame($eventQueue, 'eventQueue', $listener); + } + + public function testCouldBeConstructedWithContextAndRegistryAndPsrQueue() + { + $eventQueue = new NullQueue('symfony_events'); + + $context = $this->createContextMock(); + $context + ->expects($this->never()) + ->method('createQueue') + ; + + $listener = new AsyncListener($context, $this->createRegistryMock(), $eventQueue); + + $this->assertAttributeSame($eventQueue, 'eventQueue', $listener); } public function testShouldDoNothingIfSyncModeOn() { - $producer = $this->createProducerMock(); + $producer = $this->createContextMock(); $producer ->expects($this->never()) - ->method('sendEvent') + ->method('createProducer') ; $registry = $this->createRegistryMock(); @@ -34,7 +63,7 @@ public function testShouldDoNothingIfSyncModeOn() ->method('getTransformerNameForEvent') ; - $listener = new AsyncListener($producer, $registry); + $listener = new AsyncListener($producer, $registry, new NullQueue('symfony_events')); $listener->syncMode('fooEvent'); @@ -46,8 +75,8 @@ public function testShouldSendMessageIfSyncModeOff() { $event = new GenericEvent(); - $message = new Message(); - $message->setBody('serializedEvent'); + $message = new NullMessage('serializedEvent'); + $eventQueue = new NullQueue('symfony_events'); $transformerMock = $this->createEventTransformerMock(); $transformerMock @@ -74,11 +103,19 @@ public function testShouldSendMessageIfSyncModeOff() $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('sendEvent') - ->with('event.fooEvent', $this->identicalTo($message)) + ->method('send') + ->with($this->identicalTo($eventQueue), $this->identicalTo($message)) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('createProducer') + ->with() + ->willReturn($producer) ; - $listener = new AsyncListener($producer, $registry); + $listener = new AsyncListener($context, $registry, $eventQueue); $listener->onEvent($event, 'fooEvent'); @@ -99,11 +136,19 @@ private function createEventTransformerMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|ProducerInterface + * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer */ private function createProducerMock() { - return $this->createMock(ProducerInterface::class); + return $this->createMock(PsrProducer::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrContext + */ + private function createContextMock() + { + return $this->createMock(PsrContext::class); } /** diff --git a/pkg/async-event-dispatcher/Tests/AsyncProcessorTest.php b/pkg/async-event-dispatcher/Tests/AsyncProcessorTest.php index 8e8b93929..f0d5eaed1 100644 --- a/pkg/async-event-dispatcher/Tests/AsyncProcessorTest.php +++ b/pkg/async-event-dispatcher/Tests/AsyncProcessorTest.php @@ -2,9 +2,9 @@ namespace Enqueue\AsyncEventDispatcher\Tests; +use Enqueue\AsyncEventDispatcher\AsyncEventDispatcher; use Enqueue\AsyncEventDispatcher\AsyncProcessor; use Enqueue\AsyncEventDispatcher\EventTransformer; -use Enqueue\AsyncEventDispatcher\ProxyEventDispatcher; use Enqueue\AsyncEventDispatcher\Registry; use Enqueue\Consumption\Result; use Enqueue\Null\NullContext; @@ -105,11 +105,11 @@ private function createEventTransformerMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|ProxyEventDispatcher + * @return \PHPUnit_Framework_MockObject_MockObject|AsyncEventDispatcher */ private function createProxyEventDispatcherMock() { - return $this->createMock(ProxyEventDispatcher::class); + return $this->createMock(AsyncEventDispatcher::class); } /** diff --git a/pkg/async-event-dispatcher/Tests/Functional/UseCasesTest.php b/pkg/async-event-dispatcher/Tests/Functional/UseCasesTest.php new file mode 100644 index 000000000..e83eda9db --- /dev/null +++ b/pkg/async-event-dispatcher/Tests/Functional/UseCasesTest.php @@ -0,0 +1,199 @@ +remove(__DIR__.'/queues/'); + + // it could be any other enqueue/psr-queue compatible context. + $this->context = $context = (new FsConnectionFactory('file://'.__DIR__.'/queues'))->createContext(); + $this->queue = $queue = $context->createQueue('symfony_events'); + + $registry = new SimpleRegistry( + [ + 'test_async' => 'test_async', + 'test_async_from_async' => 'test_async', + ], + [ + 'test_async' => new TestAsyncEventTransformer($context), + ]); + + $asyncListener = new AsyncListener($context, $registry, $queue); + $this->asyncListener = function ($event, $name, $dispatcher) use ($asyncListener) { + $asyncListener->onEvent($event, $name); + + $consumer = $this->context->createConsumer($this->queue); + + $message = $consumer->receiveNoWait(); + + if ($message) { + $consumer->reject($message, true); + + echo "Send message for event: $name\n"; + } + }; + + $this->dispatcher = $dispatcher = new EventDispatcher(); + + $this->asyncDispatcher = $asyncDispatcher = new AsyncEventDispatcher($dispatcher, $asyncListener); + + $this->asyncProcessor = new AsyncProcessor($registry, $asyncDispatcher); + } + + public function testShouldDispatchBothAsyncEventAndSyncOne() + { + $this->dispatcher->addListener('test_async', function () { + echo "Sync event\n"; + }); + + $this->dispatcher->addListener('test_async', $this->asyncListener); + + $this->asyncDispatcher->addListener('test_async', function ($event, $eventName) { + echo "Async event\n"; + }); + + $this->dispatcher->dispatch('test_async', new GenericEvent()); + $this->processMessages(); + + $this->expectOutputString("Sync event\nSend message for event: test_async\nAsync event\n"); + } + + public function testShouldDispatchBothAsyncEventAndSyncOneFromWhenDispatchedFromInsideAnotherEvent() + { + $this->dispatcher->addListener('foo', function ($event, $name, EventDispatcherInterface $dispatcher) { + echo "Foo event\n"; + + $dispatcher->dispatch('test_async', new GenericEvent()); + }); + + $this->dispatcher->addListener('test_async', function () { + echo "Sync event\n"; + }); + + $this->dispatcher->addListener('test_async', $this->asyncListener); + + $this->asyncDispatcher->addListener('test_async', function ($event, $eventName) { + echo "Async event\n"; + }); + + $this->dispatcher->dispatch('foo'); + $this->processMessages(); + + $this->expectOutputString("Foo event\nSync event\nSend message for event: test_async\nAsync event\n"); + } + + public function testShouldDispatchOtherAsyncEventFromAsyncEvent() + { + $this->dispatcher->addListener('test_async', $this->asyncListener); + $this->dispatcher->addListener('test_async_from_async', $this->asyncListener); + + $this->asyncDispatcher->addListener('test_async', function ($event, $eventName, EventDispatcherInterface $dispatcher) { + echo "Async event\n"; + + $dispatcher->dispatch('test_async_from_async'); + }); + + $this->dispatcher->addListener('test_async_from_async', function ($event, $eventName, EventDispatcherInterface $dispatcher) { + echo "Async event from event\n"; + }); + + $this->dispatcher->dispatch('test_async'); + + $this->processMessages(); + $this->processMessages(); + + $this->expectOutputString("Send message for event: test_async\nAsync event\nSend message for event: test_async_from_async\nAsync event from event\n"); + } + + public function testShouldDispatchSyncListenerIfDispatchedFromAsycListner() + { + $this->dispatcher->addListener('test_async', $this->asyncListener); + + $this->dispatcher->addListener('sync', function () { + echo "Sync event\n"; + }); + + $this->asyncDispatcher->addListener('test_async', function ($event, $eventName, EventDispatcherInterface $dispatcher) { + echo "Async event\n"; + + $dispatcher->dispatch('sync'); + }); + + $this->dispatcher->dispatch('test_async'); + + $this->processMessages(); + + $this->expectOutputString("Send message for event: test_async\nAsync event\nSync event\n"); + } + + private function processMessages() + { + $consumer = $this->context->createConsumer($this->queue); + if ($message = $consumer->receiveNoWait()) { + $result = $this->asyncProcessor->process($message, $this->context); + + switch ((string) $result) { + case PsrProcessor::ACK: + $consumer->acknowledge($message); + break; + case PsrProcessor::REJECT: + $consumer->reject($message); + break; + case PsrProcessor::REQUEUE: + $consumer->reject($message, true); + break; + default: + throw new \LogicException('Result is not supported'); + } + } + } +} diff --git a/pkg/async-event-dispatcher/Tests/PhpSerializerEventTransformerTest.php b/pkg/async-event-dispatcher/Tests/PhpSerializerEventTransformerTest.php index 5b364fab3..b2a00a4ab 100644 --- a/pkg/async-event-dispatcher/Tests/PhpSerializerEventTransformerTest.php +++ b/pkg/async-event-dispatcher/Tests/PhpSerializerEventTransformerTest.php @@ -4,8 +4,9 @@ use Enqueue\AsyncEventDispatcher\EventTransformer; use Enqueue\AsyncEventDispatcher\PhpSerializerEventTransformer; -use Enqueue\Client\Message; use Enqueue\Null\NullMessage; +use Enqueue\Psr\PsrContext; +use Enqueue\Psr\PsrMessage; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; use Symfony\Component\EventDispatcher\GenericEvent; @@ -22,7 +23,7 @@ public function testShouldImplementEventTransformerInterface() public function testCouldBeConstructedWithoutAnyArguments() { - new PhpSerializerEventTransformer(); + new PhpSerializerEventTransformer($this->createContextStub()); } public function testShouldReturnMessageWithPhpSerializedEventAsBodyOnToMessage() @@ -31,14 +32,14 @@ public function testShouldReturnMessageWithPhpSerializedEventAsBodyOnToMessage() $this->markTestSkipped('This functionality only works on Symfony 3.0 or higher'); } - $transformer = new PhpSerializerEventTransformer(); + $transformer = new PhpSerializerEventTransformer($this->createContextStub()); $event = new GenericEvent('theSubject'); $expectedBody = serialize($event); $message = $transformer->toMessage('fooEvent', $event); - $this->assertInstanceOf(Message::class, $message); + $this->assertInstanceOf(PsrMessage::class, $message); $this->assertEquals($expectedBody, $message->getBody()); } @@ -51,7 +52,7 @@ public function testShouldReturnEventUnserializedFromMessageBodyOnToEvent() $message = new NullMessage(); $message->setBody(serialize(new GenericEvent('theSubject'))); - $transformer = new PhpSerializerEventTransformer(); + $transformer = new PhpSerializerEventTransformer($this->createContextStub()); $event = $transformer->toEvent('anEventName', $message); @@ -68,7 +69,7 @@ public function testThrowNotSupportedExceptionOnSymfonyPrior30OnToMessage() $this->expectException(\LogicException::class); $this->expectExceptionMessage('This transformer does not work on Symfony prior 3.0.'); - $transformer = new PhpSerializerEventTransformer(); + $transformer = new PhpSerializerEventTransformer($this->createContextStub()); $transformer->toMessage(new GenericEvent()); } @@ -82,8 +83,25 @@ public function testThrowNotSupportedExceptionOnSymfonyPrior30OnToEvent() $this->expectException(\LogicException::class); $this->expectExceptionMessage('This transformer does not work on Symfony prior 3.0.'); - $transformer = new PhpSerializerEventTransformer(); + $transformer = new PhpSerializerEventTransformer($this->createContextStub()); $transformer->toEvent('anEvent', new NullMessage()); } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrContext + */ + private function createContextStub() + { + $context = $this->createMock(PsrContext::class); + $context + ->expects($this->any()) + ->method('createMessage') + ->willReturnCallback(function ($body) { + return new NullMessage($body); + }) + ; + + return $context; + } } diff --git a/pkg/async-event-dispatcher/Tests/ProxyEventDispatcherTest.php b/pkg/async-event-dispatcher/Tests/ProxyEventDispatcherTest.php index 5706aa294..5e5c6e969 100644 --- a/pkg/async-event-dispatcher/Tests/ProxyEventDispatcherTest.php +++ b/pkg/async-event-dispatcher/Tests/ProxyEventDispatcherTest.php @@ -2,8 +2,8 @@ namespace Enqueue\AsyncEventDispatcher\Tests; +use Enqueue\AsyncEventDispatcher\AsyncEventDispatcher; use Enqueue\AsyncEventDispatcher\AsyncListener; -use Enqueue\AsyncEventDispatcher\ProxyEventDispatcher; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; use Symfony\Component\EventDispatcher\EventDispatcher; @@ -15,7 +15,7 @@ class ProxyEventDispatcherTest extends TestCase public function testShouldBeSubClassOfEventDispatcher() { - $this->assertClassExtends(EventDispatcher::class, ProxyEventDispatcher::class); + $this->assertClassExtends(EventDispatcher::class, AsyncEventDispatcher::class); } public function testShouldSetSyncModeForGivenEventNameOnDispatchAsyncListenersOnly() @@ -32,7 +32,7 @@ public function testShouldSetSyncModeForGivenEventNameOnDispatchAsyncListenersOn ; $trueEventDispatcher = new EventDispatcher(); - $dispatcher = new ProxyEventDispatcher($trueEventDispatcher, $asyncListenerMock); + $dispatcher = new AsyncEventDispatcher($trueEventDispatcher, $asyncListenerMock); $event = new GenericEvent(); $dispatcher->dispatchAsyncListenersOnly('theEvent', $event); @@ -43,15 +43,15 @@ public function testShouldCallAsyncEventButNotOtherOnDispatchAsyncListenersOnly( $otherEventWasCalled = false; $trueEventDispatcher = new EventDispatcher(); $trueEventDispatcher->addListener('theEvent', function () use (&$otherEventWasCalled) { - $this->assertInstanceOf(ProxyEventDispatcher::class, func_get_arg(2)); + $this->assertInstanceOf(AsyncEventDispatcher::class, func_get_arg(2)); $otherEventWasCalled = true; }); $asyncEventWasCalled = false; - $dispatcher = new ProxyEventDispatcher($trueEventDispatcher, $this->createAsyncLisenerMock()); + $dispatcher = new AsyncEventDispatcher($trueEventDispatcher, $this->createAsyncLisenerMock()); $dispatcher->addListener('theEvent', function () use (&$asyncEventWasCalled) { - $this->assertInstanceOf(ProxyEventDispatcher::class, func_get_arg(2)); + $this->assertInstanceOf(AsyncEventDispatcher::class, func_get_arg(2)); $asyncEventWasCalled = true; }); @@ -68,15 +68,15 @@ public function testShouldCallOtherEventIfDispatchedFromAsyncEventOnDispatchAsyn $otherEventWasCalled = false; $trueEventDispatcher = new EventDispatcher(); $trueEventDispatcher->addListener('theOtherEvent', function () use (&$otherEventWasCalled) { - $this->assertNotInstanceOf(ProxyEventDispatcher::class, func_get_arg(2)); + $this->assertNotInstanceOf(AsyncEventDispatcher::class, func_get_arg(2)); $otherEventWasCalled = true; }); $asyncEventWasCalled = false; - $dispatcher = new ProxyEventDispatcher($trueEventDispatcher, $this->createAsyncLisenerMock()); + $dispatcher = new AsyncEventDispatcher($trueEventDispatcher, $this->createAsyncLisenerMock()); $dispatcher->addListener('theEvent', function () use (&$asyncEventWasCalled) { - $this->assertInstanceOf(ProxyEventDispatcher::class, func_get_arg(2)); + $this->assertInstanceOf(AsyncEventDispatcher::class, func_get_arg(2)); $asyncEventWasCalled = true; @@ -97,7 +97,7 @@ public function testShouldNotCallAsyncEventIfDispatchedFromOtherEventOnDispatchA func_get_arg(2)->dispatch('theOtherAsyncEvent'); }); - $dispatcher = new ProxyEventDispatcher($trueEventDispatcher, $this->createAsyncLisenerMock()); + $dispatcher = new AsyncEventDispatcher($trueEventDispatcher, $this->createAsyncLisenerMock()); $dispatcher->addListener('theAsyncEvent', function () { func_get_arg(2)->dispatch('theOtherEvent'); }); diff --git a/pkg/async-event-dispatcher/Tests/SimpleRegistryTest.php b/pkg/async-event-dispatcher/Tests/SimpleRegistryTest.php new file mode 100644 index 000000000..328ed1780 --- /dev/null +++ b/pkg/async-event-dispatcher/Tests/SimpleRegistryTest.php @@ -0,0 +1,97 @@ +assertClassImplements(Registry::class, SimpleRegistry::class); + } + + public function testCouldBeConstructedWithEventsMapAndTransformersMapAsArguments() + { + new SimpleRegistry([], []); + } + + public function testShouldAllowGetTransportNameByEventName() + { + $registry = new SimpleRegistry([ + 'fooEvent' => 'fooTrans', + ], []); + + $this->assertEquals('fooTrans', $registry->getTransformerNameForEvent('fooEvent')); + } + + public function testShouldAllowDefineTransportNameAsRegExpPattern() + { + $registry = new SimpleRegistry([ + '/.*/' => 'fooRegExpTrans', + 'fooEvent' => 'fooTrans', + ], []); + + // guard + $this->assertEquals('fooTrans', $registry->getTransformerNameForEvent('fooEvent')); + + $this->assertEquals('fooRegExpTrans', $registry->getTransformerNameForEvent('fooRegExpEvent')); + } + + public function testThrowIfNotSupportedEventGiven() + { + $registry = new SimpleRegistry([ + 'fooEvent' => 'fooTrans', + ], []); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('There is no transformer registered for the given event fooNotSupportedEvent'); + $registry->getTransformerNameForEvent('fooNotSupportedEvent'); + } + + public function testThrowIfThereIsNoRegisteredTransformerWithSuchName() + { + $registry = new SimpleRegistry([], [ + 'fooTrans' => 'foo_trans_id', + ]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('There is no transformer named fooNotRegisteredName'); + $registry->getTransformer('fooNotRegisteredName'); + } + + public function testThrowIfObjectAssocWithTransportNameNotInstanceOfEventTransformer() + { + $container = new Container(); + $container->set('foo_trans_id', new \stdClass()); + + $registry = new SimpleRegistry([], [ + 'fooTrans' => new \stdClass(), + ]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The container must return instance of Enqueue\AsyncEventDispatcher\EventTransformer but got stdClass'); + $registry->getTransformer('fooTrans'); + } + + public function testShouldReturnEventTransformer() + { + $eventTransformerMock = $this->createMock(EventTransformer::class); + + $container = new Container(); + $container->set('foo_trans_id', $eventTransformerMock); + + $registry = new SimpleRegistry([], [ + 'fooTrans' => $eventTransformerMock, + ]); + + $this->assertSame($eventTransformerMock, $registry->getTransformer('fooTrans')); + } +} diff --git a/pkg/async-event-dispatcher/composer.json b/pkg/async-event-dispatcher/composer.json index 7d2e170e6..e4adc18b2 100644 --- a/pkg/async-event-dispatcher/composer.json +++ b/pkg/async-event-dispatcher/composer.json @@ -20,6 +20,7 @@ "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3", "symfony/http-kernel": "^2.8|^3", + "symfony/filesystem": "^2.8|^3", "enqueue/null": "^0.5@dev" }, "suggest": { diff --git a/pkg/enqueue-bundle/DependencyInjection/Configuration.php b/pkg/enqueue-bundle/DependencyInjection/Configuration.php index bfbf5a1d2..7fe7b13db 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Configuration.php +++ b/pkg/enqueue-bundle/DependencyInjection/Configuration.php @@ -53,9 +53,6 @@ public function getConfigTreeBuilder() ->booleanNode('job')->defaultFalse()->end() ->arrayNode('async_events') ->canBeEnabled() - ->children() - ->booleanNode('spool_producer')->defaultFalse()->end() - ->end() ->end() ->arrayNode('extensions')->addDefaultsIfNotSet()->children() ->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end() diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 67c950ddd..18a5e0207 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -120,13 +120,9 @@ public function load(array $configs, ContainerBuilder $container) if (isset($config['async_events']['enabled'])) { $extension = new AsyncEventDispatcherExtension(); - $extension->load([], $container); - - if (false == empty($config['async_events']['spool_producer'])) { - $container->getDefinition('enqueue.events.async_listener') - ->replaceArgument(0, new Reference('enqueue.client.spool_producer')) - ; - } + $extension->load([[ + 'context_service' => 'enqueue.transport.default.context', + ]], $container); } if ($config['extensions']['doctrine_ping_connection_extension']) { diff --git a/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php b/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php new file mode 100644 index 000000000..b55525fc4 --- /dev/null +++ b/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php @@ -0,0 +1,50 @@ +producer = $producer; + $this->registry = $registry; + } + + /** + * @param Event $event + * @param string $eventName + */ + public function onEvent(Event $event = null, $eventName) + { + if (false == $this->isSyncMode($eventName)) { + $transformerName = $this->registry->getTransformerNameForEvent($eventName); + + $psrMessage = $this->registry->getTransformer($transformerName)->toMessage($eventName, $event); + $message = new Message($psrMessage->getBody()); + $message->setScope(Message::SCOPE_APP); + $message->setProperty('event_name', $eventName); + $message->setProperty('transformer_name', $transformerName); + + $this->producer->sendCommand('symfony_events', $message); + } + } +} diff --git a/pkg/enqueue-bundle/Tests/Functional/App/TestAsyncEventTransformer.php b/pkg/enqueue-bundle/Tests/Functional/App/TestAsyncEventTransformer.php index 6dab2c6fc..c95f0648b 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/TestAsyncEventTransformer.php +++ b/pkg/enqueue-bundle/Tests/Functional/App/TestAsyncEventTransformer.php @@ -3,7 +3,7 @@ namespace Enqueue\Bundle\Tests\Functional\App; use Enqueue\AsyncEventDispatcher\EventTransformer; -use Enqueue\Client\Message; +use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrMessage; use Enqueue\Util\JSON; use Symfony\Component\EventDispatcher\Event; @@ -11,26 +11,44 @@ class TestAsyncEventTransformer implements EventTransformer { - public function toMessage($eventName, Event $event = null) + /** + * @var PsrContext + */ + private $context; + + /** + * @param PsrContext $context + */ + public function __construct(PsrContext $context) { + $this->context = $context; + } + + public function toMessage($eventName, Event $event) + { + if (Event::class === get_class($event)) { + return $this->context->createMessage(json_encode('')); + } + /** @var GenericEvent $event */ if (false == $event instanceof GenericEvent) { throw new \LogicException('Must be GenericEvent'); } - $message = new Message(); - $message->setBody([ + return $this->context->createMessage(json_encode([ 'subject' => $event->getSubject(), 'arguments' => $event->getArguments(), - ]); - - return $message; + ])); } public function toEvent($eventName, PsrMessage $message) { $data = JSON::decode($message->getBody()); + if ('' === $data) { + return new Event(); + } + return new GenericEvent($data['subject'], $data['arguments']); } } diff --git a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml index e95a59da5..d21a6ec34 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml @@ -64,6 +64,13 @@ services: test_async_event_transformer: class: 'Enqueue\Bundle\Tests\Functional\App\TestAsyncEventTransformer' + arguments: + - '@enqueue.transport.default.context' tags: - {name: 'enqueue.event_transformer', eventName: 'test_async', transformerName: 'test_async' } - {name: 'enqueue.event_transformer', eventName: 'test_async_subscriber', transformerName: 'test_async' } + + # overwrite async listener with one based on client producer. so we can use traceable producer. + enqueue.events.async_listener: + class: 'Enqueue\Bundle\Tests\Functional\App\AsyncListener' + arguments: ['@enqueue.client.producer', '@enqueue.events.registry'] \ No newline at end of file diff --git a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php index 09d562862..a802d2000 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php @@ -2,7 +2,7 @@ namespace Enqueue\Bundle\Tests\Functional\Events; -use Enqueue\AsyncEventDispatcher\AsyncListener; +use Enqueue\Bundle\Events\AsyncListener; use Enqueue\Bundle\Tests\Functional\App\TestAsyncListener; use Enqueue\Bundle\Tests\Functional\WebTestCase; use Enqueue\Client\TraceableProducer; @@ -38,7 +38,7 @@ public function testShouldNotCallRealListenerIfMarkedAsAsync() $this->assertEmpty($listener->calls); } - public function testShouldSendMessageToExpectedTopicInsteadOfCallingRealListener() + public function testShouldSendMessageToExpectedCommandInsteadOfCallingRealListener() { /** @var EventDispatcherInterface $dispatcher */ $dispatcher = $this->container->get('event_dispatcher'); @@ -50,11 +50,11 @@ public function testShouldSendMessageToExpectedTopicInsteadOfCallingRealListener /** @var TraceableProducer $producer */ $producer = $this->container->get('enqueue.producer'); - $traces = $producer->getTopicTraces('event.test_async'); + $traces = $producer->getCommandTraces('symfony_events'); $this->assertCount(1, $traces); - $this->assertEquals('event.test_async', $traces[0]['topic']); + $this->assertEquals('symfony_events', $traces[0]['command']); $this->assertEquals('{"subject":"theSubject","arguments":{"fooArg":"fooVal"}}', $traces[0]['body']); } @@ -70,7 +70,7 @@ public function testShouldSendMessageForEveryDispatchCall() /** @var TraceableProducer $producer */ $producer = $this->container->get('enqueue.producer'); - $traces = $producer->getTopicTraces('event.test_async'); + $traces = $producer->getCommandTraces('symfony_events'); $this->assertCount(3, $traces); } @@ -89,7 +89,7 @@ public function testShouldSendMessageIfDispatchedFromInsideListener() /** @var TraceableProducer $producer */ $producer = $this->container->get('enqueue.producer'); - $traces = $producer->getTopicTraces('event.test_async'); + $traces = $producer->getCommandTraces('symfony_events'); $this->assertCount(1, $traces); } diff --git a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php index 39d32ab87..7a1979b15 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php @@ -50,11 +50,11 @@ public function testShouldSendMessageToExpectedTopicInsteadOfCallingRealSubscrib /** @var TraceableProducer $producer */ $producer = $this->container->get('enqueue.producer'); - $traces = $producer->getTopicTraces('event.test_async_subscriber'); + $traces = $producer->getCommandTraces('symfony_events'); $this->assertCount(1, $traces); - $this->assertEquals('event.test_async_subscriber', $traces[0]['topic']); + $this->assertEquals('symfony_events', $traces[0]['command']); $this->assertEquals('{"subject":"theSubject","arguments":{"fooArg":"fooVal"}}', $traces[0]['body']); } @@ -70,7 +70,7 @@ public function testShouldSendMessageForEveryDispatchCall() /** @var TraceableProducer $producer */ $producer = $this->container->get('enqueue.producer'); - $traces = $producer->getTopicTraces('event.test_async_subscriber'); + $traces = $producer->getCommandTraces('symfony_events'); $this->assertCount(3, $traces); } @@ -89,7 +89,7 @@ public function testShouldSendMessageIfDispatchedFromInsideListener() /** @var TraceableProducer $producer */ $producer = $this->container->get('enqueue.producer'); - $traces = $producer->getTopicTraces('event.test_async_subscriber'); + $traces = $producer->getCommandTraces('symfony_events'); $this->assertCount(1, $traces); } From f30446736598eca5096cb1c0fc8595222f7c027a Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 16:16:46 +0300 Subject: [PATCH 06/15] fix phpstan. --- pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php b/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php index b55525fc4..125f31f3c 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php +++ b/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php @@ -7,7 +7,7 @@ use Enqueue\Client\ProducerInterface; use Symfony\Component\EventDispatcher\Event; -class AsyncListener extends \Enqueue\AsyncEventDispatcher\AsyncListener +class AsyncListener { /** * @var ProducerInterface From 1783753903d3c57422bfa9aac60832c725710c4e Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 16:20:04 +0300 Subject: [PATCH 07/15] fix doc. --- docs/bundle/async_events.md | 51 +------------------------------------ 1 file changed, 1 insertion(+), 50 deletions(-) diff --git a/docs/bundle/async_events.md b/docs/bundle/async_events.md index f71e2ef3a..77051d6c6 100644 --- a/docs/bundle/async_events.md +++ b/docs/bundle/async_events.md @@ -68,60 +68,11 @@ services: acme.async_foo_listener: class: 'Enqueue\AsyncEventDispatcher\AsyncListener' public: false - arguments: ['@enqueue.client.producer', '@enqueue.events.registry'] + arguments: ['@enqueue.transport.default.context', '@enqueue.events.registry', 'a_queue_name'] tags: - { name: 'kernel.event_listener', event: 'foo', method: 'onEvent' } ``` -The message processor must subscribe to `event.foo` topic. The message queue topics names for event follow this patter `event.{eventName}`. - -```php -registry = $registry; - } - - public function process(PsrMessage $message, PsrContext $context) - { - if (false == $eventName = $message->getProperty('event_name')) { - return self::REJECT; - } - if (false == $transformerName = $message->getProperty('transformer_name')) { - return self::REJECT; - } - - // do what you want with the event. - $event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message); - - - return self::ACK; - } - - public static function getSubscribedTopics() - { - return ['event.foo']; - } -} -``` - ## Event transformer From cc33fa89e6d3be55d480512c9fa77b7239d27f18 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 16:32:51 +0300 Subject: [PATCH 08/15] add quick tour for async event dispatcher. --- docs/async_event_dispatcher/quick_tour.md | 112 +++++++++++++++++++ docs/index.md | 2 + pkg/async-event-dispatcher/AsyncListener.php | 7 +- 3 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 docs/async_event_dispatcher/quick_tour.md diff --git a/docs/async_event_dispatcher/quick_tour.md b/docs/async_event_dispatcher/quick_tour.md new file mode 100644 index 000000000..ad1de6b32 --- /dev/null +++ b/docs/async_event_dispatcher/quick_tour.md @@ -0,0 +1,112 @@ +# Async event dispatcher (Symfony) + +The doc shows how you can setup async event dispatching in plain PHP. +If you are looking for the ways to use it in Symfony application [read this post instead](../bundle/async_events.md) + +* [Installation](#installation) +* [Configuration](#configuration) +* [Dispatch event](#dispatch-event) +* [Process async events](#process-async-events) + +## Installation + +You need the async dispatcher library and a one of [the supported transports](../transport) + +```bash +$ composer require enqueue/async-event-dispatcher enqueue/fs +``` + +## Configuration + +```php +createContext(); +$eventQueue = $context->createQueue('symfony_events'); + +$registry = new SimpleRegistry( + ['the_event' => 'default'], + ['default' => new PhpSerializerEventTransformer($context, true)] +); + +$asyncListener = new AsyncListener($context, $registry, $eventQueue); + +$dispatcher = new EventDispatcher(); + +// the listener sends even as a message through MQ +$dispatcher->addListener('the_event', $asyncListener); + +$asyncDispatcher = new AsyncEventDispatcher($dispatcher, $asyncListener); + +// the listener is executed on consumer side. +$asyncDispatcher->addListener('the_event', function() { +}); + +$asyncProcessor = new AsyncProcessor($registry, $asyncDispatcher); +``` + +## Dispatch event + +```php +dispatch('the_event', new GenericEvent('theSubject')); +``` + +## Process async events + +```php +createConsumer($eventQueue); + +while (true) { + if ($message = $consumer->receive(5000)) { + $result = $asyncProcessor->process($message, $context); + + switch ((string) $result) { + case PsrProcessor::ACK: + $consumer->acknowledge($message); + break; + case PsrProcessor::REJECT: + $consumer->reject($message); + break; + case PsrProcessor::REQUEUE: + $consumer->reject($message, true); + break; + default: + throw new \LogicException('Result is not supported'); + } + } +} +``` + +[back to index](../index.md) diff --git a/docs/index.md b/docs/index.md index 9cfeed118..49ae4c2bf 100644 --- a/docs/index.md +++ b/docs/index.md @@ -34,6 +34,8 @@ - [Production settings](bundle/production_settings.md) - [Debuging](bundle/debuging.md) - [Functional testing](bundle/functional_testing.md) +* Async event dispatcher (Symfony) + - [Quick tour](async_event_dispatcher/quick_tour.md) * Magento - [Quick tour](magento/quick_tour.md) - [Cli commands](magento/cli_commands.md) diff --git a/pkg/async-event-dispatcher/AsyncListener.php b/pkg/async-event-dispatcher/AsyncListener.php index f7b3cf1b8..b8e6e9f8e 100644 --- a/pkg/async-event-dispatcher/AsyncListener.php +++ b/pkg/async-event-dispatcher/AsyncListener.php @@ -40,6 +40,11 @@ public function __construct(PsrContext $context, Registry $registry, $eventQueue $this->eventQueue = $eventQueue instanceof PsrQueue ? $eventQueue : $context->createQueue($eventQueue); } + public function __invoke(Event $event, $eventName) + { + $this->onEvent($event, $eventName); + } + public function resetSyncMode() { $this->syncMode = []; @@ -67,7 +72,7 @@ public function isSyncMode($eventName) * @param Event $event * @param string $eventName */ - public function onEvent(Event $event = null, $eventName) + public function onEvent(Event $event, $eventName) { if (false == isset($this->syncMode[$eventName])) { $transformerName = $this->registry->getTransformerNameForEvent($eventName); From d6f8c124c7d26cd60cdfe798cc33e3b364bc7d65 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 16:39:26 +0300 Subject: [PATCH 09/15] fix tests. --- pkg/async-event-dispatcher/Tests/AsyncListenerTest.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/async-event-dispatcher/Tests/AsyncListenerTest.php b/pkg/async-event-dispatcher/Tests/AsyncListenerTest.php index 5fa73dbb2..5abfb41d0 100644 --- a/pkg/async-event-dispatcher/Tests/AsyncListenerTest.php +++ b/pkg/async-event-dispatcher/Tests/AsyncListenerTest.php @@ -11,6 +11,7 @@ use Enqueue\Psr\PsrProducer; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; +use Symfony\Component\EventDispatcher\Event; use Symfony\Component\EventDispatcher\GenericEvent; class AsyncListenerTest extends TestCase @@ -67,7 +68,7 @@ public function testShouldDoNothingIfSyncModeOn() $listener->syncMode('fooEvent'); - $listener->onEvent(null, 'fooEvent'); + $listener->onEvent(new Event(), 'fooEvent'); $listener->onEvent(new GenericEvent(), 'fooEvent'); } From 836f2d13f8a1946113aed33b8c51d90f4b418455 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 16:46:13 +0300 Subject: [PATCH 10/15] fix tests. --- .../Tests/Functional/App/AsyncListener.php | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php b/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php index 125f31f3c..13a014dab 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php +++ b/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php @@ -19,6 +19,8 @@ class AsyncListener */ private $registry; + private $syncMode = []; + /** * @param ProducerInterface $producer * @param Registry $registry @@ -29,6 +31,34 @@ public function __construct(ProducerInterface $producer, Registry $registry) $this->registry = $registry; } + public function __invoke(Event $event, $eventName) + { + $this->onEvent($event, $eventName); + } + + public function resetSyncMode() + { + $this->syncMode = []; + } + + /** + * @param string $eventName + */ + public function syncMode($eventName) + { + $this->syncMode[$eventName] = true; + } + + /** + * @param string $eventName + * + * @return bool + */ + public function isSyncMode($eventName) + { + return isset($this->syncMode[$eventName]); + } + /** * @param Event $event * @param string $eventName From 27e67b0e8aad91bfa73ebdea95bd30bdd532b624 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 17:02:18 +0300 Subject: [PATCH 11/15] fix phpstan issues. --- .travis.yml | 1 + phpstan.neon | 3 ++- .../Tests/Spec/GearmanSendToAndReceiveFromQueueTest.php | 3 ++- .../Tests/Spec/GearmanSendToAndReceiveNoWaitFromQueueTest.php | 3 ++- .../Tests/Spec/GearmanSendToTopicAndReceiveFromQueueTest.php | 3 ++- .../Spec/GearmanSendToTopicAndReceiveNoWaitFromQueueTest.php | 3 ++- .../Tests/Spec/PheanstalkSendToAndReceiveFromQueueTest.php | 3 ++- .../Spec/PheanstalkSendToAndReceiveNoWaitFromQueueTest.php | 3 ++- .../Spec/PheanstalkSendToTopicAndReceiveFromQueueTest.php | 3 ++- .../PheanstalkSendToTopicAndReceiveNoWaitFromQueueTest.php | 3 ++- 10 files changed, 19 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index 7fef74f22..11c35b087 100644 --- a/.travis.yml +++ b/.travis.yml @@ -57,6 +57,7 @@ script: # misssing pkg/amqp-ext pkg/job-queue pkg/redis - if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/enqueue pkg/psr-queue pkg/fs pkg/simple-client; fi - if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/stomp pkg/dbal pkg/enqueue-bundle pkg/null pkg/sqs pkg/test; fi + - if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/gearman pkg/pheanstalk pkg/async-event-dispatcher; fi - if [ "$PHP_CS_FIXER" = true ]; then IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS; fi - if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer fix --config=.php_cs.dist -v --dry-run --stop-on-violation --using-cache=no --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}"; fi - if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi diff --git a/phpstan.neon b/phpstan.neon index be35ef23b..09b247b81 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,3 +1,4 @@ parameters: excludes_analyse: - - pkg/enqueue/Util/UUID.php \ No newline at end of file + - pkg/enqueue/Util/UUID.php + - pkg/enqueue-bundle/Tests/Functional/App \ No newline at end of file diff --git a/pkg/gearman/Tests/Spec/GearmanSendToAndReceiveFromQueueTest.php b/pkg/gearman/Tests/Spec/GearmanSendToAndReceiveFromQueueTest.php index ff88ea8dd..6117bb0c8 100644 --- a/pkg/gearman/Tests/Spec/GearmanSendToAndReceiveFromQueueTest.php +++ b/pkg/gearman/Tests/Spec/GearmanSendToAndReceiveFromQueueTest.php @@ -1,7 +1,8 @@ Date: Wed, 5 Jul 2017 17:10:01 +0300 Subject: [PATCH 12/15] fix tests. --- .../Tests/Functional/App/AsyncListener.php | 32 +------------------ 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php b/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php index 13a014dab..b55525fc4 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php +++ b/pkg/enqueue-bundle/Tests/Functional/App/AsyncListener.php @@ -7,7 +7,7 @@ use Enqueue\Client\ProducerInterface; use Symfony\Component\EventDispatcher\Event; -class AsyncListener +class AsyncListener extends \Enqueue\AsyncEventDispatcher\AsyncListener { /** * @var ProducerInterface @@ -19,8 +19,6 @@ class AsyncListener */ private $registry; - private $syncMode = []; - /** * @param ProducerInterface $producer * @param Registry $registry @@ -31,34 +29,6 @@ public function __construct(ProducerInterface $producer, Registry $registry) $this->registry = $registry; } - public function __invoke(Event $event, $eventName) - { - $this->onEvent($event, $eventName); - } - - public function resetSyncMode() - { - $this->syncMode = []; - } - - /** - * @param string $eventName - */ - public function syncMode($eventName) - { - $this->syncMode[$eventName] = true; - } - - /** - * @param string $eventName - * - * @return bool - */ - public function isSyncMode($eventName) - { - return isset($this->syncMode[$eventName]); - } - /** * @param Event $event * @param string $eventName From 88fa91e00c4f1937c958648b1fb598c2290012c8 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 18:52:34 +0300 Subject: [PATCH 13/15] phpstan fix --- .travis.yml | 4 +--- phpstan.neon | 6 +++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 11c35b087..b7bb4f471 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,9 +55,7 @@ install: script: # misssing pkg/amqp-ext pkg/job-queue pkg/redis - - if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/enqueue pkg/psr-queue pkg/fs pkg/simple-client; fi - - if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/stomp pkg/dbal pkg/enqueue-bundle pkg/null pkg/sqs pkg/test; fi - - if [ "$PHPSTAN" = true ]; then ./bin/phpstan analyse -l 1 -c phpstan.neon pkg/gearman pkg/pheanstalk pkg/async-event-dispatcher; fi + - if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/psr-queue pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test; fi - if [ "$PHP_CS_FIXER" = true ]; then IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS; fi - if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer fix --config=.php_cs.dist -v --dry-run --stop-on-violation --using-cache=no --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}"; fi - if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi diff --git a/phpstan.neon b/phpstan.neon index 09b247b81..d606893b6 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,4 +1,8 @@ parameters: excludes_analyse: - pkg/enqueue/Util/UUID.php - - pkg/enqueue-bundle/Tests/Functional/App \ No newline at end of file + - pkg/enqueue-bundle/Tests/Functional/App + - pkg/job-queue/Test/JobRunner.php + - pkg/job-queue/Tests/Functional/app/AppKernel.php + - pkg/redis/PhpRedis.php + - pkg/redis/RedisConnectionFactory.php \ No newline at end of file From 70601b5e014a75d095d9e0d1d179fb6adc3a08b3 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 19:30:52 +0300 Subject: [PATCH 14/15] do not run phpstan on gearman --- phpstan.neon | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/phpstan.neon b/phpstan.neon index d606893b6..644d0826b 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -5,4 +5,5 @@ parameters: - pkg/job-queue/Test/JobRunner.php - pkg/job-queue/Tests/Functional/app/AppKernel.php - pkg/redis/PhpRedis.php - - pkg/redis/RedisConnectionFactory.php \ No newline at end of file + - pkg/redis/RedisConnectionFactory.php + - pkg/gearman \ No newline at end of file From 9e2054cb4db00fa75c424667d6aebb9185c31885 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 5 Jul 2017 19:45:18 +0300 Subject: [PATCH 15/15] fix phpstan --- phpstan.neon | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/phpstan.neon b/phpstan.neon index 644d0826b..8dd052e94 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -6,4 +6,5 @@ parameters: - pkg/job-queue/Tests/Functional/app/AppKernel.php - pkg/redis/PhpRedis.php - pkg/redis/RedisConnectionFactory.php - - pkg/gearman \ No newline at end of file + - pkg/gearman + - pkg/amqp-ext/AmqpConsumer.php \ No newline at end of file