Conversation
samdark
commented
Apr 18, 2026
| Q | A |
|---|---|
| Is bugfix? | ✔️/❌ |
| New feature? | ✔️/❌ |
| Breaks BC? | ✔️/❌ |
| Tests pass? | ✔️/❌ |
| Fixed issues | comma-separated list of tickets # fixed by the PR, if any |
There was a problem hiding this comment.
Pull request overview
This PR introduces a marker interface to identify adapters whose messages should be handled immediately after Queue::push(), and updates the built-in SynchronousAdapter to use that behavior instead of buffering messages for later processing.
Changes:
- Added
ImmediateProcessingAdapterInterfaceas a marker for “handle immediately after push” adapters. - Updated
Queue::push()to invoke the worker immediately when the push adapter implements the marker interface. - Refactored
SynchronousAdapterto implement the marker interface and stop buffering messages.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
src/Queue.php |
Triggers immediate handling in push() when the adapter is marked as immediate-processing. |
src/Adapter/SynchronousAdapter.php |
Converts adapter to immediate-processing model and removes buffering/flush-on-destruct behavior. |
src/Adapter/ImmediateProcessingAdapterInterface.php |
Adds a marker interface to opt adapters into immediate handling. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public function push(MessageInterface $message): MessageInterface | ||
| { | ||
| $key = count($this->messages) + $this->current; | ||
| $this->messages[] = $message; | ||
| $key = $this->processed; | ||
| $this->processed++; | ||
|
|
||
| return new IdEnvelope($message, $key); |
There was a problem hiding this comment.
$processed is incremented in push() before the message is actually handled (handling happens in Queue::push()). If Worker::process() throws, callers could still observe status(id) === DONE even though processing failed. Consider updating the design so the adapter’s DONE accounting is updated only after successful processing (e.g., a post-handle hook / explicit acknowledgement), or adjust status() semantics/documentation to avoid reporting DONE on failed pushes.
| if ($request->getAdapter() instanceof ImmediateProcessingAdapterInterface) { | ||
| $this->handle($message); | ||
| } |
There was a problem hiding this comment.
Immediate-processing path calls $this->handle($message), but handle() always calls $this->loop->canContinue(). Since the return value is ignored here, this adds unnecessary coupling/side effects (e.g., memory-limit checks) during push. Consider calling $this->worker->process($message, $queue) directly (or introducing a dedicated method) to avoid invoking the loop for producer-side immediate handling.
| ); | ||
|
|
||
| if ($request->getAdapter() instanceof ImmediateProcessingAdapterInterface) { | ||
| $this->handle($message); |
There was a problem hiding this comment.
The adapter used for the push is taken from the middleware-dispatched PushRequest, but immediate processing passes $this (which still holds the original queue adapter) into Worker::process(). If any push middleware swaps the adapter via PushRequest::withAdapter(), handlers will run with a Queue instance configured with the wrong adapter. Consider processing using a queue instance that has $request->getAdapter() attached (e.g., clone via withAdapter() when it differs).
| $this->handle($message); | |
| $queue = $this; | |
| if ($request->getAdapter() !== $this->adapter) { | |
| $queue = clone $this; | |
| $queue->adapter = $request->getAdapter(); | |
| } | |
| $queue->handle($message); |
| final class SynchronousAdapter implements AdapterInterface, ImmediateProcessingAdapterInterface | ||
| { | ||
| private array $messages = []; | ||
| private int $current = 0; | ||
|
|
||
| public function __construct( | ||
| private readonly WorkerInterface $worker, | ||
| private readonly QueueInterface $queue, | ||
| ) {} | ||
|
|
||
| public function __destruct() | ||
| { | ||
| $this->runExisting(function (MessageInterface $message): bool { | ||
| $this->worker->process($message, $this->queue); | ||
|
|
||
| return true; | ||
| }); | ||
| } | ||
| private int $processed = 0; | ||
|
|
||
| public function runExisting(callable $handlerCallback): void | ||
| { | ||
| $result = true; | ||
| while (isset($this->messages[$this->current]) && $result === true) { | ||
| $result = $handlerCallback($this->messages[$this->current]); | ||
| unset($this->messages[$this->current]); | ||
| $this->current++; | ||
| } | ||
| // Messages are handled immediately in Queue::push(). | ||
| } |
There was a problem hiding this comment.
SynchronousAdapter’s constructor dependencies were removed and processing responsibility moved to Queue::push() via the new marker interface. This is a backward-incompatible change for any code instantiating SynchronousAdapter with (WorkerInterface, QueueInterface) as currently done in this repo’s tests/docs, and will cause runtime/compile errors unless all call sites are updated or a BC shim (e.g., optional constructor parameters or a named constructor) is kept.