From 5adc5528571de44cadbf753f2be3007410e295ea Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Mon, 27 Oct 2025 14:08:41 +0300 Subject: [PATCH 1/2] Abandon all pending operations when connection was closed by the server --- src/Channel.php | 11 ++++++++++- .../Cancellation/CancellationStorage.php | 11 +++++++++++ src/Internal/Cancellation/Canceller.php | 16 ++++++++++++++++ src/Internal/QueueIterator.php | 5 +++++ src/Iterator.php | 2 ++ 5 files changed, 44 insertions(+), 1 deletion(-) diff --git a/src/Channel.php b/src/Channel.php index b25833c..ab5fac1 100644 --- a/src/Channel.php +++ b/src/Channel.php @@ -284,6 +284,14 @@ public function consumeIterator( /** @var Internal\QueueIterator $iterator */ $iterator = Internal\QueueIterator::buffered($consumerTag, $this, $size); + $canceller = new Canceller( + $iterator->complete(...), + $iterator->cancel(...), + $iterator->abandon(...), + ); + + $this->cancellations->add($consumerTag, $canceller); + $this->consume( callback: $iterator->push(...), queue: $queue, @@ -339,6 +347,7 @@ public function consumeBatch( $canceller = new Canceller( $iterator->complete(...), $iterator->cancel(...), + $iterator->abandon(...), ); $this->cancellations->add($consumerTag, $canceller); @@ -782,7 +791,7 @@ public function abandon(\Throwable $e): void { $this->hooks->reject($this->channelId, $e); $this->hooks->unsubscribe($this->channelId); - $this->cancellations->cancelAll(error: $e); + $this->cancellations->abandon($e); $this->supervisor->stop(); $this->closed = new Sync\Once(static fn(): bool => true); } diff --git a/src/Internal/Cancellation/CancellationStorage.php b/src/Internal/Cancellation/CancellationStorage.php index 336fcea..7c454d4 100644 --- a/src/Internal/Cancellation/CancellationStorage.php +++ b/src/Internal/Cancellation/CancellationStorage.php @@ -33,6 +33,17 @@ public function cancel(string $consumerTag, bool $noWait = false, ?\Throwable $e } } + public function abandon(\Throwable $e): void + { + try { + foreach ($this->cancellers as $canceller) { + $canceller->abandon($e); + } + } finally { + $this->cancellers = []; + } + } + public function cancelAll(bool $noWait = false, ?\Throwable $error = null): void { try { diff --git a/src/Internal/Cancellation/Canceller.php b/src/Internal/Cancellation/Canceller.php index fa99963..4c26538 100644 --- a/src/Internal/Cancellation/Canceller.php +++ b/src/Internal/Cancellation/Canceller.php @@ -19,10 +19,12 @@ final class Canceller /** * @param callable(bool): void $complete * @param callable(\Throwable, bool): void $cancel + * @param callable(\Throwable): void $abandon */ public function __construct( private readonly mixed $complete, private readonly mixed $cancel, + private readonly mixed $abandon, ) { $this->deferred = new DeferredCancellation(); } @@ -46,6 +48,20 @@ public function cancel(bool $noWait = false, ?\Throwable $e = null): void } } + public function abandon(\Throwable $e): void + { + if ($this->cancelled) { + return; + } + + try { + ($this->abandon)($e); + $this->deferred->cancel(); + } finally { + $this->cancelled = true; + } + } + public function cancellation(): Cancellation { return $this->deferred->getCancellation(); diff --git a/src/Internal/QueueIterator.php b/src/Internal/QueueIterator.php index 72c51a2..4609130 100644 --- a/src/Internal/QueueIterator.php +++ b/src/Internal/QueueIterator.php @@ -66,6 +66,11 @@ public function cancel(\Throwable $e, bool $noWait = false): void $this->queue->error($e); } + public function abandon(\Throwable $e): void + { + $this->queue->error($e); + } + public function continue(?Cancellation $cancellation = null): bool { return $this->iterator->continue($cancellation); diff --git a/src/Iterator.php b/src/Iterator.php index cdc7d1e..c6b32a8 100644 --- a/src/Iterator.php +++ b/src/Iterator.php @@ -24,6 +24,8 @@ public function complete(bool $noWait = false): void; */ public function cancel(\Throwable $e, bool $noWait = false): void; + public function abandon(\Throwable $e): void; + /** * @throws CancelledException */ From 8b36264c4bc44fc6d72e58791ad0dc69b93a185c Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Mon, 27 Oct 2025 14:54:55 +0300 Subject: [PATCH 2/2] Dont complete already completed iterator --- composer.json | 2 +- src/Internal/QueueIterator.php | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/composer.json b/composer.json index 59c128f..4df2bf6 100644 --- a/composer.json +++ b/composer.json @@ -84,7 +84,7 @@ "fixcs": "tools/php-cs-fixer/vendor/bin/php-cs-fixer fix --diff --verbose", "infection": "tools/infection/vendor/bin/infection --show-mutations", "normalize": "@composer bin composer-normalize normalize --diff ../../composer.json", - "phpstan": "tools/phpstan/vendor/bin/phpstan analyze", + "phpstan": "tools/phpstan/vendor/bin/phpstan analyze --memory-limit=512M", "pre-command-run": "mkdir -p var", "psalm": "tools/psalm/vendor/bin/psalm --show-info --no-diff --no-cache", "rector": "tools/rector/vendor/bin/rector process", diff --git a/src/Internal/QueueIterator.php b/src/Internal/QueueIterator.php index 4609130..c5d19a2 100644 --- a/src/Internal/QueueIterator.php +++ b/src/Internal/QueueIterator.php @@ -56,19 +56,25 @@ public function push(mixed $delivery): void public function complete(bool $noWait = false): void { - $this->channel->cancel($this->consumerTag, $noWait); - $this->queue->complete(); + if (!$this->queue->isComplete()) { + $this->queue->complete(); + $this->channel->cancel($this->consumerTag, $noWait); + } } public function cancel(\Throwable $e, bool $noWait = false): void { - $this->channel->cancel($this->consumerTag, $noWait); - $this->queue->error($e); + if (!$this->queue->isComplete()) { + $this->queue->error($e); + $this->channel->cancel($this->consumerTag, $noWait); + } } public function abandon(\Throwable $e): void { - $this->queue->error($e); + if (!$this->queue->isComplete()) { + $this->queue->error($e); + } } public function continue(?Cancellation $cancellation = null): bool