diff --git a/README.md b/README.md index 8745eb2b..4e88167c 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ An extension for running tasks asynchronously via queues. ## Requirements - PHP 8.1 or higher. +- PCNTL extension for signal handling _(optional, recommended for production use)_. ## Installation @@ -28,288 +29,104 @@ The package could be installed with [Composer](https://getcomposer.org): composer require yiisoft/queue ``` -## Ready for Yii Config +## Quick Start -If you are using [yiisoft/config](https://github.com/yiisoft/config), you'll find out this package has some defaults -in the [`common`](config/di.php) and [`params`](config/params.php) configurations saving your time. Things you should -change to start working with the queue: +### 1. Install an adapter -- Optionally: define default `\Yiisoft\Queue\Adapter\AdapterInterface` implementation. -- And/or define channel-specific `AdapterInterface` implementations in the `channel` params key to be used - with the [queue provider](#different-queue-channels). -- Define [message handlers](docs/guide/worker.md#handler-format) in the `handlers` params key to be used with the `QueueWorker`. -- Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher). +For production use, you should install an adapter package that matches your message broker ([AMQP](https://github.com/yiisoft/queue-amqp), [Kafka](https://github.com/g41797/queue-kafka), [NATS](https://github.com/g41797/queue-nats), and [others](docs/guide/en/adapter-list.md)). +See the [adapter list](docs/guide/en/adapter-list.md) and follow the adapter-specific documentation. -## Differences to yii2-queue +> For development and testing, you can start without an external broker using the built-in [`SynchronousAdapter`](docs/guide/en/adapter-sync.md). +> This adapter processes messages immediately in the same process, so it won't provide true async execution, +> but it's useful for getting started and writing tests. -If you have experience with `yiisoft/yii2-queue`, you will find out that this package is similar. -Though, there are some key differences that are described in the "[migrating from yii2-queue](docs/guide/migrating-from-yii2-queue.md)" -article. +### 2. Configure the queue -## General usage +#### Configuration with [yiisoft/config](https://github.com/yiisoft/config) -Each queue task consists of two parts: +**If you use [yiisoft/app](https://github.com/yiisoft/app) or [yiisoft/app-api](https://github.com/yiisoft/app-api)** -1. A message is a class implementing `MessageInterface`. For simple cases you can use the default implementation, - `Yiisoft\Queue\Message\Message`. For more complex cases, you should implement the interface by your own. -2. A message handler is a callable called by a `Yiisoft\Queue\Worker\Worker`. The handler handles each queue message. +Add queue configuration to your application `$params` config. In [yiisoft/app](https://github.com/yiisoft/app)/[yiisoft/app-api](https://github.com/yiisoft/app-api) templates it's typically the `config/params.php` file. +_If your project structure differs, put it into any params config file that is loaded by [yiisoft/config](https://github.com/yiisoft/config)._ -For example, if you need to download and save a file, your message creation may look like the following: -- Message handler as the first parameter -- Message data as the second parameter +Minimal configuration example: ```php -$data = [ - 'url' => $url, - 'destinationFile' => $filename, +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'handler-name' => [FooHandler::class, 'handle'], + ], + ], ]; -$message = new \Yiisoft\Queue\Message\Message(FileDownloader::class, $data); ``` -Then you should push it to the queue: +[Advanced configuration with `yiisoft/config`](docs/guide/en/configuration-with-config.md) -```php -$queue->push($message); -``` +#### Manual configuration -Its handler may look like the following: +For setting up all classes manually, see the [Manual configuration](docs/guide/en/configuration-manual.md) guide. + +### 3. Prepare a handler + +You need to create a handler class that will process the queue messages. The most simple way is to implement the `HandleInterface`. Let's create an example for remote file processing: ```php -class FileDownloader +use Yiisoft\Queue\Handler\HandleInterface; +use Yiisoft\Queue\Message\MessageInterface; + +final readonly class RemoteFileHandler implements HandleInterface { private string $absolutePath; - public function __construct(string $absolutePath) - { - $this->absolutePath = $absolutePath; - } + // These dependencies will be resolved on handler creation by the DI container + public function __construct( + private FileDownloader $downloader, + private FileProcessor $processor, + ) {} - public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void + // Every received message will be processed by this method + public function handle(MessageInterface $downloadMessage): void { $fileName = $downloadMessage->getData()['destinationFile']; - $path = "$this->absolutePath/$fileName"; - file_put_contents($path, file_get_contents($downloadMessage->getData()['url'])); + $localPath = $this->downloader->download($fileName); + $this->processor->process($localPath); } } ``` -The last thing we should do is to create a configuration for the `Yiisoft\Queue\Worker\Worker`: - -```php -$worker = new \Yiisoft\Queue\Worker\Worker( - [], - $logger, - $injector, - $container -); -``` - -There is a way to run all the messages that are already in the queue, and then exit: - -```php -$queue->run(); // this will execute all the existing messages -$queue->run(10); // while this will execute only 10 messages as a maximum before exit -``` - -If you don't want your script to exit immediately, you can use the `listen` method: - -```php -$queue->listen(); -``` - -You can also check the status of a pushed message (the queue adapter you are using must support this feature): - -```php -$queue->push($message); -$id = $message->getId(); - -// Get status of the job -$status = $queue->status($id); - -// Check whether the job is waiting for execution. -$status->isWaiting(); - -// Check whether a worker got the job from the queue and executes it. -$status->isReserved(); - -// Check whether a worker has executed the job. -$status->isDone(); -``` - -## Custom handler names -### Custom handler names - -By default, when you push a message to the queue, the message handler name is the fully qualified class name of the handler. -This can be useful for most cases, but sometimes you may want to use a shorter name or arbitrary string as the handler name. -This can be useful when you want to reduce the amount of data being passed or when you communicate with external systems. - -To use a custom handler name before message push, you can pass it as the first argument `Message` when creating it: -```php -new Message('handler-name', $data); -``` - -To use a custom handler name on message consumption, you should configure handler mapping for the `Worker` class: -```php -$worker = new \Yiisoft\Queue\Worker\Worker( - ['handler-name' => FooHandler::class], - $logger, - $injector, - $container -); -``` - -## Different queue channels - -Often we need to push to different queue channels with an only application. There is the `QueueProviderInterface` -interface that provides different `Queue` objects creation for different channels. With implementation of this interface -channel-specific `Queue` creation is as simple as - -```php -$queue = $provider->get('channel-name'); -``` - -Out of the box, there are four implementations of the `QueueProviderInterface`: +### 4. Send (produce/push) a message to a queue -- `AdapterFactoryQueueProvider` -- `PrototypeQueueProvider` -- `CompositeQueueProvider` - -### `AdapterFactoryQueueProvider` - -Provider based on the definition of channel-specific adapters. Definitions are passed in -the `$definitions` constructor parameter of the factory, where keys are channel names and values are definitions -for the [`Yiisoft\Factory\Factory`](https://github.com/yiisoft/factory). Below are some examples: +To send a message to the queue, you need to get the queue instance and call the `push()` method. Typically, with Yii Framework you'll get a `Queue` instance as a dependency of a service. ```php -use Yiisoft\Queue\Adapter\SynchronousAdapter; - -[ - 'channel1' => new SynchronousAdapter(), - 'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'), - 'channel3' => [ - 'class' => SynchronousAdapter::class, - '__constructor' => ['channel' => 'channel3'], - ], -] -``` - -For more information about the definition formats available, see the [factory](https://github.com/yiisoft/factory) documentation. - -### `PrototypeQueueProvider` - -Queue provider that only changes the channel name of the base queue. It can be useful when your queues used the same -adapter. - -> Warning: This strategy is not recommended as it does not give you any protection against typos and mistakes -> in channel names. - -### `CompositeQueueProvider` - -This provider allows you to combine multiple providers into one. It will try to get a queue from each provider in the -order they are passed to the constructor. The first queue found will be returned. - -## Console execution - -The exact way of task execution depends on the adapter used. Most adapters can be run using -console commands, which the component automatically registers in your application. - -The following command obtains and executes tasks in a loop until the queue is empty: - -```sh -yii queue:run -``` - -The following command launches a daemon which infinitely queries the queue: - -```sh -yii queue:listen -``` - -See the documentation for more details about adapter specific console commands and their options. - -The component can also track the status of a job which was pushed into queue. - -For more details, see [the guide](docs/guide/en/README.md). -## Middleware pipelines - -Any message pushed to a queue or consumed from it passes through two different middleware pipelines: one pipeline -on message push and another - on a message consume. The process is the same as for the HTTP request, but it is executed -twice for a queue message. That means you can add extra functionality on message pushing and consuming with configuration -of the two classes: `PushMiddlewareDispatcher` and `ConsumeMiddlewareDispatcher` respectively. - -You can use any of these formats to define a middleware: - -- A ready-to-use middleware object: `new FooMiddleware()`. It must implement `MiddlewarePushInterface`, - `MiddlewareConsumeInterface` or `MiddlewareFailureInterface` depending on the place you use it. -- An array in the format of [yiisoft/definitions](https://github.com/yiisoft/definitions). - **Only if you use yiisoft/definitions and yiisoft/di**. -- A `callable`: `fn() => // do stuff`, `$object->foo(...)`, etc. It will be executed through the -[yiisoft/injector](https://github.com/yiisoft/injector), so all the dependencies of your callable will be resolved. -- A string for your DI container to resolve the middleware, e.g. `FooMiddleware::class` - -Middleware will be executed forwards in the same order they are defined. If you define it like the following: -`[$middleware1, $midleware2]`, the execution will look like this: - -```mermaid -graph LR - StartPush((Start)) --> PushMiddleware1[$middleware1] --> PushMiddleware2[$middleware2] --> Push(Push to a queue) - -.-> PushMiddleware2[$middleware2] -.-> PushMiddleware1[$middleware1] - PushMiddleware1[$middleware1] -.-> EndPush((End)) +final readonly class Foo { + public function __construct(private QueueInterface $queue) {} - - StartConsume((Start)) --> ConsumeMiddleware1[$middleware1] --> ConsumeMiddleware2[$middleware2] --> Consume(Consume / handle) - -.-> ConsumeMiddleware2[$middleware2] -.-> ConsumeMiddleware1[$middleware1] - ConsumeMiddleware1[$middleware1] -.-> EndConsume((End)) + public function bar(): void + { + $this->queue->push(new Message( + // The first parameter is the handler name that will process this concrete message + RemoteFileHandler::class, + // The second parameter is the data that will be passed to the handler. + // It should be serializable to JSON format + ['destinationFile' => 'https://example.com/file-path.csv'], + )); + } +} ``` -### Push a pipeline - -When you push a message, you can use middlewares to modify both message and queue adapter. -With message modification you can add extra data, obfuscate data, collect metrics, etc. -With queue adapter modification you can redirect the message to another queue, delay message consuming, and so on. +### 5. Handle queued messages -To use this feature, you have to create a middleware class, which implements `MiddlewarePushInterface`, and -return a modified `PushRequest` object from the `processPush` method: +By default, Yii Framework uses [yiisoft/yii-console](https://github.com/yiisoft/yii-console) to run CLI commands. If you installed [yiisoft/app](https://github.com/yiisoft/app) or [yiisoft/app-api](https://github.com/yiisoft/app-api), you can run the queue worker with on of these two commands: -```php -return $pushRequest->withMessage($newMessage)->withAdapter($newAdapter); +```bash +./yii queue:run # Handle all existing messages in the queue +./yii queue:listen # Start a daemon listening for new messages permanently ``` -With push middlewares you can define an adapter object at the runtime, not in the `Queue` constructor. -There is a restriction: by the time all middlewares are executed in the forward order, the adapter must be specified -in the `PushRequest` object. You will get a `AdapterNotConfiguredException`, if it isn't. - -You have three places to define push middlewares: - -1. `PushMiddlewareDispatcher`. You can pass it either to the constructor, or to the `withMiddlewares()` method, which -creates a completely new dispatcher object with only those middlewares, which are passed as arguments. -If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-push` key of the -`yiisoft/queue` array in the `params`. -2. Pass middlewares to either `Queue::withMiddlewares()` or `Queue::withMiddlewaresAdded()` methods. The difference is -that the former will completely replace an existing middleware stack, while the latter will add passed middlewares to -the end of the existing stack. These middlewares will be executed after the common ones, passed directly to the -`PushMiddlewareDispatcher`. It's useful when defining a queue channel. Both methods return a new instance of the `Queue` -class. -3. Put middlewares into the `Queue::push()` method like this: `$queue->push($message, ...$middlewares)`. These -middlewares have the lowest priority and will be executed after those which are in the `PushMiddlewareDispatcher` and -the ones passed to the `Queue::withMiddlewares()` and `Queue::withMiddlewaresAdded()` and only for the message passed -along with them. - -### Consume pipeline - -You can set a middleware pipeline for a message when it will be consumed from a queue server. This is useful to collect metrics, modify message data, etc. In a pair with a Push middleware you can deduplicate messages in the queue, calculate time from push to consume, handle errors (push to a queue again, redirect failed message to another queue, send a notification, etc.). Except push pipeline, you have only one place to define the middleware stack: in the `ConsumeMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-consume` key of the `yiisoft/queue` array in the `params`. - -### Error handling pipeline - -Often when some job is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in `yiisoft/queue` with a Failure middleware pipeline. They are triggered each time message processing via the Consume middleware pipeline is interrupted with any `Throwable`. The key differences from the previous two pipelines: - -- You should set up the middleware pipeline separately for each queue channel. That means, the format should be `['channel-name' => [FooMiddleware::class]]` instead of `[FooMiddleware::class]`, like for the other two pipelines. There is also a default key, which will be used for those channels without their own one: `FailureMiddlewareDispatcher::DEFAULT_PIPELINE`. -- The last middleware will throw the exception, which will come with the `FailureHandlingRequest` object. If you don't want the exception to be thrown, your middlewares should `return` a request without calling `$handler->handleFailure()`. - -You can declare error handling a middleware pipeline in the `FailureMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-fail` key of the `yiisoft/queue` array in the `params`. - -See [error handling docs](docs/guide/error-handling.md) for details. +> In case you're using the `SynchronosAdapter` for development purposes, you should not use these commands, as you have no asynchronous processing available. The messages are processed immediately when pushed. ## Documentation diff --git a/config/params.php b/config/params.php index 086f8f70..c2670611 100644 --- a/config/params.php +++ b/config/params.php @@ -18,6 +18,7 @@ 'commands' => [ 'queue:run' => RunCommand::class, 'queue:listen' => ListenCommand::class, + 'queue:listen-all' => ListenAllCommand::class, 'queue:listen:all' => ListenAllCommand::class, ], ], diff --git a/docs/guide/en/README.md b/docs/guide/en/README.md index 564b8ca9..72268714 100644 --- a/docs/guide/en/README.md +++ b/docs/guide/en/README.md @@ -2,10 +2,39 @@ An extension for running tasks asynchronously via queues. -## Guides and concept explanations +## Getting started + +- [Prerequisites and installation](prerequisites-and-installation.md) +- [Configuration with yiisoft/config](configuration-with-config.md) +- [Manual configuration](configuration-manual.md) - [Usage basics](usage.md) -- [Migrating from `yii2-queue`](migrating-from-yii2-queue.md) -- [Errors and retryable jobs](error-handling.md) - [Workers](worker.md) +- [Console commands](console-commands.md) + +## Adapters + - [Adapter list](adapter-list.md) +- [Synchronous adapter](adapter-sync.md) + +## Core concepts + +- [Queue channels](channels.md) +- [Message handler](message-handler.md) +- [Envelopes](envelopes.md) +- [Middleware pipelines](middleware-pipelines.md) +- [Loops](loops.md) + +## Interoperability + +- [Producing messages from external systems](producing-messages-from-external-systems.md) + +## Reliability and visibility + +- [Errors and retryable jobs](error-handling.md) +- [Job status](job-status.md) +- [Yii Debug integration](debug-integration.md) + +## Migration from Yii2 + +- [Migrating from `yii2-queue`](migrating-from-yii2-queue.md) diff --git a/docs/guide/en/callable-definitions-extended.md b/docs/guide/en/callable-definitions-extended.md new file mode 100644 index 00000000..9d64ba12 --- /dev/null +++ b/docs/guide/en/callable-definitions-extended.md @@ -0,0 +1,100 @@ +# Callable Definitions Extended + +Сallable definitions in `yiisoft/queue` extend [native PHP callables](https://www.php.net/manual/en/language.types.callable.php). That means, there are two types of definitions. Nevertheless, each of them may define dependency list in their parameter lists, which will be resolved via [yiisoft/injector](https://github.com/yiisoft/injector) and a DI Container. +It is used across the package to convert configuration definitions into real callables. + +## Type 1: Native PHP callable + +When you define a callable in a such manner, they are not modified in any way and are called as is. An only difference is that you can define dependency list in their parameter lists, which will be resolved via [yiisoft/injector](https://github.com/yiisoft/injector) and a DI Container. +As you can see in the [PHP documentation](https://www.php.net/manual/en/language.types.callable.php), there are several ways to define a native callable: + +- **Closure (lambda function)**. It may be static. Example: + ```php + $callable = static function(Update $update) { + // do stuff + } + ``` +- **First class callable**. It's a Closure too, BTW ;) Example: + ```php + $callable = trim(...); + $callable2 = $this->foo(...); + ``` +- **A class static function**. When a class has a static function, an array syntax may be used: + ```php + $callable = [Foo::class, 'bar']; // this will be called the same way as Foo::bar(); + ``` +- **An object method**. The same as above, but with an object and a non-static method: + ```php + $foo = new Foo(); + $callable = [$foo, 'bar']; // this will be called the same way as $foo->bar(); + ``` +- **A class static function as a string**. I don't recommend you to use this ability, as it's non-obvious and + hard to refactor, but it still exists: + ```php + $callable = 'Foo::bar'; // this will be called the same way as Foo::bar(); + ``` +- **A name of a named function**: + ```php + function foo() { + // do stuff + } + $callable = 'foo'; + $callable2 = 'array_map'; + ``` +- **Callable objects**. An object with [the `__invoke` method](https://www.php.net/manual/en/language.oop5.magic.php#object.invoke) implemented: + ```php + class Foo + { + public function __invoke() + { + // do stuff + } + } + + $callable = new Foo(); + ``` + +## Type 2: Callable definition extensions (via container) + +Under the hood, this extension behaves exactly like the **Type 1** ones. But there is a major difference too: +all the objects are instantiated automatically with a PSR-11 DI Container with all their dependencies +and in a lazy way (only when they are really needed). +Ways to define an extended callable: + +- An object method through a class name or alias: + ```php + final readonly class Foo + { + public function __construct(private MyHeavyDependency $dependency) {} + + public function bar() + { + // do stuff + } + } + + $callable = [Foo::class, 'bar']; + ``` + Here is a simplified example of how it works: + ```php + if ($container->has($callable[0])) { + $callable[0] = $container->get($callable[0]) + } + + $callable(); + ``` +- Class name of an object with [the `__invoke` method](https://www.php.net/manual/en/language.oop5.magic.php#object.invoke) implemented: + ```php + $callable = Foo::class; + ``` + It works the same way as above: an object will be retrieved from a DI container and called as a function. + +_Note: you can use an alias registered in your DI Container instead of a class name._ This will also work if you have a "class alias" definition in container: +```php +$callable = 'class alias'; // for a "callable object" +$callable2 = ['class alias', 'foo']; // to call "foo" method of an object found by "class alias" in DI Container +``` + +## Invalid definitions + +The factory throws `Yiisoft\Queue\Middleware\InvalidCallableConfigurationException` when it cannot create a callable (for example: `null`, unsupported array format, missing method, container entry is not callable). diff --git a/docs/guide/en/channels.md b/docs/guide/en/channels.md new file mode 100644 index 00000000..1dc6d365 --- /dev/null +++ b/docs/guide/en/channels.md @@ -0,0 +1,210 @@ +# Queue channels + +A *queue channel* is a named queue configuration. + +In practice, a channel is a string (for example, `yii-queue`, `emails`, `critical`) that selects which queue backend (adapter) messages are pushed to and which worker consumes them. + +Having multiple channels is useful when you want to separate workloads, for example: + +- **Different priorities**: `critical` vs `low`. +- **Different message types**: `emails`, `reports`, `webhooks`. +- **Different backends / connections**: fast Redis queue for short jobs and a different backend for long-running jobs. + +The default channel name is `Yiisoft\Queue\QueueInterface::DEFAULT_CHANNEL` (`yii-queue`). + +## How channels are used in the code + +- A channel name is passed to `Yiisoft\Queue\Provider\QueueProviderInterface::get($channel)`. +- The provider returns a `Yiisoft\Queue\QueueInterface` instance bound to that channel. +- Internally, the provider creates an adapter instance and calls `AdapterInterface::withChannel($channel)`. + +In other words, a channel is the key that lets the application select a particular adapter instance/configuration. + +## Choosing a channel at runtime + +### In CLI + +These built-in commands accept channel names: + +- `queue:listen [channel]` listens to a single channel (defaults to `yii-queue`). +- `queue:run [channel1 [channel2 [...]]]` processes existing messages and exits. +- `queue:listen-all [channel1 [channel2 [...]]]` iterates over multiple channels (meant mostly for development). + +Examples: + +```sh +yii queue:listen emails +yii queue:run critical emails --maximum=100 +yii queue:listen-all critical emails --pause=1 --maximum=500 +``` + +### In PHP code + +When you have a `QueueProviderInterface`, request a queue by channel name: + +```php +/** @var \Yiisoft\Queue\Provider\QueueProviderInterface $provider */ + +$emailsQueue = $provider->get('emails'); +$emailsQueue->push(new \Yiisoft\Queue\Message\Message('send-email', ['to' => 'user@example.com'])); +``` + +You can also check if a channel exists before trying to get it: + +```php +if ($provider->has('emails')) { + $emailsQueue = $provider->get('emails'); +} +``` + +`QueueProviderInterface` accepts both strings and `BackedEnum` values (they are normalized to a string channel name). + +`QueueProviderInterface::get()` may throw: + +- `Yiisoft\Queue\Provider\ChannelNotFoundException` +- `Yiisoft\Queue\Provider\InvalidQueueConfigException` +- `Yiisoft\Queue\Provider\QueueProviderException` + +## Providers + +`QueueProviderInterface` is the component responsible for returning a `QueueInterface` instance bound to a particular channel. + +Out of the box, this package provides three implementations: + +- `Yiisoft\Queue\Provider\AdapterFactoryQueueProvider` +- `Yiisoft\Queue\Provider\PrototypeQueueProvider` +- `Yiisoft\Queue\Provider\CompositeQueueProvider` + +### `AdapterFactoryQueueProvider` + +This provider creates channel-specific `QueueInterface` instances based on adapter definitions. + +It uses [`yiisoft/factory`](https://github.com/yiisoft/factory) to resolve adapter definitions. + +This approach is recommended when you want: + +- Separate configuration per channel. +- Stronger validation (unknown channels are not silently accepted). + +### `PrototypeQueueProvider` + +This provider always returns a queue by taking a base queue + base adapter and only changing the channel name. + +This can be useful when all channels use the same adapter and only differ by channel name. + +This strategy is not recommended as it does not give you any protection against typos and mistakes in channel names. + +Example: + +```php +use Yiisoft\Queue\Provider\PrototypeQueueProvider; + +$provider = new PrototypeQueueProvider($queue, $adapter); + +$queueForEmails = $provider->get('emails'); +$queueForCritical = $provider->get('critical'); +``` + +### `CompositeQueueProvider` + +This provider combines multiple providers into one. + +It tries to resolve a channel by calling `has()`/`get()` on each provider in the order they are passed to the constructor. +The first provider that reports it has the channel wins. + +Example: + +```php +use Yiisoft\Queue\Provider\CompositeQueueProvider; + +$provider = new CompositeQueueProvider( + $providerA, + $providerB, +); + +$queue = $provider->get('emails'); +``` + +## Configuration with yiisoft/config + + When using [yiisoft/config](https://github.com/yiisoft/config), channel configuration is stored in params under `yiisoft/queue.channels`. + + By default, `QueueProviderInterface` is bound to `AdapterFactoryQueueProvider`. + That makes `yiisoft/queue.channels` a strict channel registry: + + - `QueueProviderInterface::has($channel)` checks whether the channel exists in definitions. + - `QueueProviderInterface::get($channel)` throws `ChannelNotFoundException` for unknown channels. + + The same channel list is used by `queue:run` and `queue:listen-all` as the default set of channels to process. + + It is a map: + +- key: channel name +- value: adapter definition that should be resolved for that channel + +Minimal example (single channel): + +```php +use Yiisoft\Queue\Adapter\AdapterInterface; +use Yiisoft\Queue\QueueInterface; + +return [ + 'yiisoft/queue' => [ + 'channels' => [ + QueueInterface::DEFAULT_CHANNEL => AdapterInterface::class, + ], + ], +]; +``` + +Multiple channels example: + +```php +use Yiisoft\Queue\QueueInterface; + +return [ + 'yiisoft/queue' => [ + 'channels' => [ + QueueInterface::DEFAULT_CHANNEL => \Yiisoft\Queue\Adapter\AdapterInterface::class, + 'critical' => \Yiisoft\Queue\Adapter\AdapterInterface::class, + 'emails' => \Yiisoft\Queue\Adapter\AdapterInterface::class, + ], + ], +]; +``` + +The exact adapter definitions depend on which queue adapter package you use (Redis, AMQP, etc.). + +When using the default DI config from this package, the configured channel names are also used as the default channel list for `queue:run` and `queue:listen-all`. + +## Manual configuration (without yiisoft/config) + +For multiple channels without `yiisoft/config`, you can create a provider manually. + +`AdapterFactoryQueueProvider` accepts adapter definitions indexed by channel names and returns a `QueueInterface` for a channel on demand: + +```php +use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider; +use Yiisoft\Queue\Adapter\SynchronousAdapter; + +$definitions = [ + 'channel1' => new SynchronousAdapter($worker, $queue), + 'channel2' => static fn (SynchronousAdapter $adapter) => $adapter->withChannel('channel2'), + 'channel3' => [ + 'class' => SynchronousAdapter::class, + '__constructor' => ['channel' => 'channel3'], + ], +]; + +$provider = new AdapterFactoryQueueProvider( + $queue, + $definitions, + $container, +); + +$queueForChannel1 = $provider->get('channel1'); +$queueForChannel2 = $provider->get('channel2'); +$queueForChannel3 = $provider->get('channel3'); +``` + +For more information about the definition formats available, see the [`yiisoft/factory` documentation](https://github.com/yiisoft/factory). diff --git a/docs/guide/en/configuration-manual.md b/docs/guide/en/configuration-manual.md new file mode 100644 index 00000000..08c88fea --- /dev/null +++ b/docs/guide/en/configuration-manual.md @@ -0,0 +1,120 @@ +# Manual Configuration (without [yiisoft/config](https://github.com/yiisoft/config)) + +This guide explains how to set up the queue component manually, without using [yiisoft/config](https://github.com/yiisoft/config). + +## Basic setup + +To use the queue, you need to create instances of the following classes: + +1. **Adapter** - handles the actual queue backend (e.g., `SynchronousAdapter`, or an adapter from external packages like Redis, AMQP, etc.) +2. **Worker** - processes messages from the queue +3. **Queue** - the main entry point for pushing messages + +### Example + +```php +use Yiisoft\Queue\Adapter\SynchronousAdapter; +use Yiisoft\Queue\Queue; +use Yiisoft\Queue\Worker\Worker; +use Yiisoft\Queue\Middleware\CallableFactory; +use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; +use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume; +use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher; +use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFactoryFailure; +use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush; +use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; +use Psr\Container\ContainerInterface; + +// You need a PSR-11 container for dependency injection +/** @var ContainerInterface $container */ + +// Define message handlers +$handlers = [ + 'file-download' => [FileDownloader::class, 'handle'], + FileDownloader::class => [FileDownloader::class, 'handle'], +]; + +$callableFactory = new CallableFactory($container); + +// Create middleware dispatchers +$consumeMiddlewareDispatcher = new ConsumeMiddlewareDispatcher( + new MiddlewareFactoryConsume($container, $callableFactory), +); + +$failureMiddlewareDispatcher = new FailureMiddlewareDispatcher( + new MiddlewareFactoryFailure($container, $callableFactory), + [], +); + +$pushMiddlewareDispatcher = new PushMiddlewareDispatcher( + new MiddlewareFactoryPush($container), +); + +// Create worker +$worker = new Worker( + $handlers, + $container->get(\Psr\Log\LoggerInterface::class), + $container->get(\Yiisoft\Injector\Injector::class), + $container, + $consumeMiddlewareDispatcher, + $failureMiddlewareDispatcher, + $callableFactory, +); + +// Create queue with adapter +$queue = new Queue( + $worker, + $pushMiddlewareDispatcher, + $container->get(\Psr\EventDispatcher\EventDispatcherInterface::class), + new SynchronousAdapter($worker, /* queue instance will be set via withAdapter */), +); + +// Now you can push messages +$message = new \Yiisoft\Queue\Message\Message('file-download', ['url' => 'https://example.com/file.pdf']); +$queue->push($message); +``` + +## Using Queue Provider + +For multiple queue channels, use `AdapterFactoryQueueProvider`: + +```php +use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider; +use Yiisoft\Queue\Adapter\SynchronousAdapter; + +$definitions = [ + 'channel1' => new SynchronousAdapter($worker, $queue), + 'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'), +]; + +$provider = new AdapterFactoryQueueProvider( + $queue, + $definitions, + $container, +); + +$queueForChannel1 = $provider->get('channel1'); +$queueForChannel2 = $provider->get('channel2'); +``` + +## Running the queue + +### Processing existing messages + +```php +$queue->run(); // Process all messages +$queue->run(10); // Process up to 10 messages +``` + +### Listening for new messages + +```php +$queue->listen(); // Run indefinitely +``` + +## Next steps + +- [Usage basics](usage.md) - learn how to create messages and handlers +- [Workers](worker.md) - understand handler formats +- [Error handling](error-handling.md) - configure retries and failure handling +- [Adapter list](adapter-list.md) - choose a production-ready adapter diff --git a/docs/guide/en/configuration-with-config.md b/docs/guide/en/configuration-with-config.md new file mode 100644 index 00000000..cf7b7ba2 --- /dev/null +++ b/docs/guide/en/configuration-with-config.md @@ -0,0 +1,60 @@ +# Configuration with [yiisoft/config](https://github.com/yiisoft/config) + +If you are using [yiisoft/config](https://github.com/yiisoft/config) (i.e. installed with [yiisoft/app](https://github.com/yiisoft/app) or [yiisoft/app-api](https://github.com/yiisoft/app-api)), you'll find out this package has some defaults in the [`common`](../../../config/di.php) and [`params`](../../../config/params.php) configurations saving your time. + +## Where to put the configuration + +In [yiisoft/app](https://github.com/yiisoft/app)/[yiisoft/app-api](https://github.com/yiisoft/app-api) templates you typically add or adjust configuration in `config/params.php`. +If your project structure differs, put configuration into any params config file that is loaded by [yiisoft/config](https://github.com/yiisoft/config). + +## What you need to configure + +- Define queue channel adapter definitions in the `channels` params key. See more about channels [here](./channels.md). +- Optionally: define [message handlers](./message-handler.md) in the `handlers` params key to be used with the `QueueWorker`. +- Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher). + +By default, when using the DI config provided by this package, `QueueProviderInterface` is bound to `AdapterFactoryQueueProvider` and uses `yiisoft/queue.channels` as a strict channel registry. +That means unknown channels are not accepted silently and `QueueProviderInterface::get()` will throw `ChannelNotFoundException`. +The configured channel names are also used as the default channel list for `queue:run` and `queue:listen-all`. + +For development and testing you can start with the synchronous adapter. +For production you must use a real backend adapter (AMQP, Kafka, SQS, etc.). If you do not have any preference, start with [yiisoft/queue-amqp](https://github.com/yiisoft/queue-amqp) and [RabbitMQ](https://www.rabbitmq.com/). + +The examples below use the synchronous adapter for brevity. In production, override `yiisoft/queue.channels` with an adapter definition from the backend adapter package you selected. + +## Minimal configuration example + +If you use the handler class FQCN as the message handler name, no additional configuration is required. + +See [Message handler](./message-handler.md) for details and trade-offs. + +## Minimal configuration example (named handlers) + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'handler-name' => [FooHandler::class, 'handle'], + ], + ], +]; +``` + +## Full configuration example + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'handler-name' => [FooHandler::class, 'handle'], + ], + 'channels' => [ + \Yiisoft\Queue\QueueInterface::DEFAULT_CHANNEL => \Yiisoft\Queue\Adapter\SynchronousAdapter::class, + ], + 'middlewares-push' => [], // push middleware pipeline definition + 'middlewares-consume' => [], // consume middleware pipeline definition + 'middlewares-fail' => [], // consume failure handling middleware pipeline definition + ], +]; +``` +Middleware pipelines are discussed in detail [here](./middleware-pipelines.md). diff --git a/docs/guide/en/console-commands.md b/docs/guide/en/console-commands.md new file mode 100644 index 00000000..d3a94a76 --- /dev/null +++ b/docs/guide/en/console-commands.md @@ -0,0 +1,50 @@ +# Console commands + +Yii Queue provides several console commands for processing queued jobs. + +If you are using [yiisoft/config](https://github.com/yiisoft/config) and [yiisoft/yii-console](https://github.com/yiisoft/yii-console), the commands are registered automatically. + +If you are using [symfony/console](https://github.com/symfony/console) directly, you should register the commands manually. + +In [yiisoft/app](https://github.com/yiisoft/app) the `yii` console binary is provided out of the box. +If you are using [yiisoft/console](https://github.com/yiisoft/console) or `symfony/console` without that template, invoke these commands the same way you invoke other console commands in your application. + +## 1. Run queued messages and exit + +The command `queue:run` obtains and executes tasks until the queue is empty, then exits. + +You can also narrow the scope of processed messages by specifying channel(s) and maximum number of messages to process: + +- Specify one or more channels to process. Messages from other channels will be ignored. Default is all registered channels (in case of using [yiisoft/config](https://github.com/yiisoft/config) and [yiisoft/yii-console](https://github.com/yiisoft/yii-console), otherwise pass the default channel list to the command constructor). +- Use `--maximum` to limit the number of messages processed. When set, command will exit either when all the messages are processed or when the maximum count is reached. + +The full command signature is: +```sh +yii queue:run [channel1 [channel2 [...]]] --maximum=100 +``` + +## 2. Listen for queued messages and process them continuously + +The following command launches a daemon, which infinitely consumes messages from a single channel of the queue. This command receives an optional `channel` argument to specify which channel to listen to, defaults to the default channel `yii-queue`. + +```sh +yii queue:listen [channel] +``` + +## 3. Listen to multiple channels + +The following command iterates through multiple channels and is meant to be used in development environment only, as it consumes a lot of CPU for iterating through channels. You can pass to it: + +- `channel` argument(s). Specify one or more channels to process. Messages from other channels will be ignored. Default is all registered channels (in case of using [yiisoft/config](https://github.com/yiisoft/config) and [yiisoft/yii-console](https://github.com/yiisoft/yii-console), otherwise pass the default channel list to the command constructor). +- `--maximum` option to limit the number of messages processed before switching to another channel. E.g. you set `--maximum` to 500 and right now you have 1000 messages in `channel1`. This command will consume only 500 of them, then it will switch to `channel2` to see if there are any messages there. Defaults to `0` (no limit). +- `--pause` option to specify the number of seconds to pause between checking channels when no messages are found. Defaults to `1`. + +The full command signature is: +```sh +yii queue:listen-all [channel1 [channel2 [...]]] --pause=1 --maximum=0 +``` + +> The command alias `queue:listen:all` is deprecated and will be removed in `1.0.0`, since it was a typo. + +For long-running processes, graceful shutdown is controlled by `LoopInterface`. When `ext-pcntl` is available, +the default `SignalLoop` handles signals such as `SIGTERM`/`SIGINT`. diff --git a/docs/guide/en/debug-integration.md b/docs/guide/en/debug-integration.md new file mode 100644 index 00000000..c6f785c6 --- /dev/null +++ b/docs/guide/en/debug-integration.md @@ -0,0 +1,53 @@ +# Yii Debug integration + +This package provides an integration with [yiisoft/yii-debug](https://github.com/yiisoft/yii-debug). + +When debug is enabled, it collects queue-related information and shows it in the Yii Debug panel. + +## What is collected + +The integration is based on `Yiisoft\Queue\Debug\QueueCollector`. + +It collects: + +- Pushed messages grouped by queue channel (including middleware definitions passed to `push()`). +- Job status checks performed via `QueueInterface::status()`. +- Messages processed by a worker grouped by queue channel. + +## How it works + +The integration is enabled by registering the collector and wrapping tracked services with proxy implementations. + +In this package defaults (see `config/params.php`), the following services are tracked: + +- `Yiisoft\Queue\Provider\QueueProviderInterface` is wrapped with `Yiisoft\Queue\Debug\QueueProviderInterfaceProxy`. + The proxy decorates returned queues with `Yiisoft\Queue\Debug\QueueDecorator` to collect `push()` and `status()` calls. +- `Yiisoft\Queue\Worker\WorkerInterface` is wrapped with `Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy` to collect message processing. + +Because of that, to see data in debug you should obtain `QueueProviderInterface` / `WorkerInterface` from the DI container. + +## Configuration + +If you use [yiisoft/config](https://github.com/yiisoft/config) and the configuration plugin, these defaults are loaded automatically from this package. + +Otherwise, you can configure it manually in your params configuration: + +```php +use Yiisoft\Queue\Debug\QueueCollector; +use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy; +use Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy; +use Yiisoft\Queue\Provider\QueueProviderInterface; +use Yiisoft\Queue\Worker\WorkerInterface; + +return [ + 'yiisoft/yii-debug' => [ + 'collectors' => [ + QueueCollector::class, + ], + 'trackedServices' => [ + QueueProviderInterface::class => [QueueProviderInterfaceProxy::class, QueueCollector::class], + WorkerInterface::class => [QueueWorkerInterfaceProxy::class, QueueCollector::class], + ], + ], +]; +``` diff --git a/docs/guide/en/envelopes.md b/docs/guide/en/envelopes.md new file mode 100644 index 00000000..5e46cc25 --- /dev/null +++ b/docs/guide/en/envelopes.md @@ -0,0 +1,62 @@ +# Envelopes + +An *envelope* is a message container that wraps another message and adds metadata. + +An envelope implements `Yiisoft\Queue\Message\EnvelopeInterface`, which itself extends `Yiisoft\Queue\Message\MessageInterface`. + +## How an envelope behaves + +An envelope acts like the wrapped message: + +- `getHandlerName()` is delegated to the wrapped message. +- `getData()` is delegated to the wrapped message. + +What an envelope adds is `getMetadata()`. + +## Metadata and envelope stacking + +Every envelope contributes its own metadata to the resulting metadata array. + +`Yiisoft\Queue\Message\Envelope` (base class) also maintains an envelope stack in message metadata under `EnvelopeInterface::ENVELOPE_STACK_KEY` (`"envelopes"`). + +When `getMetadata()` is called on an envelope, it returns: + +- the wrapped message metadata, +- plus an updated `"envelopes"` stack (previous stack + current envelope class), +- plus envelope-specific metadata. + +Because envelopes wrap other messages, multiple envelopes form a stack. + +## Creating envelopes + +To wrap a message into an envelope, envelope classes provide: + +- `EnvelopeInterface::fromMessage(MessageInterface $message): static` + +and, via `MessageInterface` inheritance, also support: + +- `Envelope::fromData(string $handlerName, mixed $data, array $metadata = []): static` + +## Restoring envelopes from metadata + +If metadata contains the `"envelopes"` key with an array of envelope class names, the serializer will try to rebuild the stack by wrapping the message with each envelope class in the given order. + +During this process: + +- The `"envelopes"` key is removed from the base message metadata (it is set to an empty array before creating the base message). +- Each envelope class from the list is applied to the message using `EnvelopeInterface::fromMessage(...)`. +- A value is applied only if it is a string, the class exists, and it implements `EnvelopeInterface`. Otherwise it is ignored. + +## Built-in envelopes + +### IdEnvelope + +`Yiisoft\Queue\Message\IdEnvelope` adds a message identifier into metadata under the `IdEnvelope::MESSAGE_ID_KEY` key (`"yii-message-id"`). + +This envelope is used to carry the adapter-provided message ID through the message lifecycle. + +### FailureEnvelope + +`Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope` stores failure-handling metadata under the `FailureEnvelope::FAILURE_META_KEY` key (`"failure-meta"`). + +The envelope merges failure metadata when building `getMetadata()`. diff --git a/docs/guide/en/error-handling.md b/docs/guide/en/error-handling.md index 2a17c11a..5912c3f5 100644 --- a/docs/guide/en/error-handling.md +++ b/docs/guide/en/error-handling.md @@ -1,12 +1,68 @@ # Error handling on message processing -Often when some message handling is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in `yiisoft/queue` with _Failure Handling Middleware Pipeline_. It is triggered each time message processing via Consume Middleware Pipeline is interrupted with any `Throwable`. +Often when some message handling is failing, we want to retry its execution a couple more times or redirect it to another queue channel. In `yiisoft/queue` this is handled by the failure handling middleware pipeline. -## Configuration +## When failure handling is triggered + +Failure handling is triggered only when message processing throws a `Throwable`. + +In practice it means: + +- The worker runs message processing in `Yiisoft\Queue\Worker\Worker::process()`. +- Your message handler is executed through the [consume middleware pipeline](middleware-pipelines.md#consume-pipeline). +- If any `Throwable` escapes that pipeline, the worker switches to the failure handling pipeline. + +## Failure handling pipeline overview (step-by-step) + +1. A message is processed via the consume pipeline + + The worker builds a `Yiisoft\Queue\Middleware\Consume\ConsumeRequest` and dispatches it through `ConsumeMiddlewareDispatcher`. The final consume handler invokes the resolved message handler. + +2. A `Throwable` is caught by the worker + + If any middleware or the message handler throws, `Worker::process()` catches it. + +3. Failure context is wrapped into a request object + + The worker creates a `Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest` ([source](../../../src/Middleware/FailureHandling/FailureHandlingRequest.php)) containing: + + - the message + - the caught exception + - the queue instance (including its channel) -Here below is configuration via `yiisoft/config`. If you don't use it, you should add a middleware definition list (in the `middlewares-fail` key here) to the `FailureMiddlewareDispatcher` by your own. +4. A failure pipeline is selected by queue channel -Configuration should be passed to the `yiisoft/queue.fail-strategy-pipelines` key of the `params` config to work with the `yiisoft/config`. You can define different failure handling pipelines for each queue channel. Let's see and describe an example: + `FailureMiddlewareDispatcher::dispatch()` selects which pipeline to run: + + - It tries to use the pipeline configured for the current queue channel. + - If there is no pipeline for that channel (or it is empty), it falls back to `FailureMiddlewareDispatcher::DEFAULT_PIPELINE`. + +5. Failure middlewares are executed + + The dispatcher builds a lazy middleware stack (`MiddlewareFailureStack`) and invokes it. + + Each failure middleware implements `MiddlewareFailureInterface`: + + - It receives the `FailureHandlingRequest` and a continuation handler. + - It may handle the failure by re-queueing the message (same or different queue/channel), optionally with a delay. + - If it decides not to handle the failure, it calls `$handler->handleFailure($request)` to continue the pipeline. + +6. If nothing handles the failure, the exception is rethrown + + The failure pipeline ends with `FailureFinalHandler`, which throws `$request->getException()`. + +7. The worker wraps and rethrows + + If the failure pipeline itself ends with an exception, `Worker::process()` wraps it into `Yiisoft\Queue\Exception\JobFailureException` (including message id from `IdEnvelope` metadata when available) and throws it. + +## What “handled failure” means + +A failure is considered handled if the failure pipeline returns a `FailureHandlingRequest` without throwing. +In practice, built-in middlewares handle failures by re-queueing the message (same or different queue/channel), optionally with a delay, and returning the updated request. + +## Configuration + +Here below is configuration via [yiisoft/config](https://github.com/yiisoft/config) (see also [Configuration with yiisoft/config](configuration-with-config.md)). If you don't use it, you should add a middleware definition list (in the `middlewares-fail` key here) to the `FailureMiddlewareDispatcher` [by your own](configuration-manual.md). You can define different failure handling pipelines for each queue channel (see [Queue channels](channels.md)). The example below defines two different failure handling pipelines: ```php 'yiisoft/queue' => [ @@ -39,11 +95,15 @@ Configuration should be passed to the `yiisoft/queue.fail-strategy-pipelines` ke ] ``` -Keys here except `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` are queue channel names, and values are lists of `FailureMiddlewareInterface` definitions. `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` defines a default pipeline to apply to channels without an explicitly defined failure strategy pipeline. Each middleware definition must be one of: +Here is the meaning of the keys: +- The `failed-messages` key couples the defined pipeline with the `failed-messages` queue channel. +- The `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` key couples the defined pipeline with all queue channels without an explicitly defined failure strategy pipeline. + +Each middleware definition must be one of: - A ready-to-use `MiddlewareFailureInterface` object like `new FooMiddleware()`. - A valid definition for the [yiisoft/definitions](https://github.com/yiisoft/definitions). It must describe an object, implementing the `MiddlewareFailureInterface`. -- A callable: `fn() => // do stuff`, `$object->foo(...)`, etc. It will be executed through the `yiisoft/injector`, so all the dependencies of your callable will be resolved. You can also define a "callable-looking" array, where an object will be instantiated with a DI container: `[FooMiddleware::class, 'handle']`. -- A string for your DI container to resolve the middleware, e.g. `FooMiddleware::class`. +- An [extended callable definition](callable-definitions-extended.md). +- An id string for your DI container to resolve a middleware, e.g. `FooMiddleware::class`. In the example above failures will be handled this way (look the concrete middleware description below): @@ -55,36 +115,86 @@ In the example above failures will be handled this way (look the concrete middle Failures of messages, which are initially sent to the `failed-messages` channel, will only be handled by the 3rd and the 4th points of this list. ## Default failure handling strategies + + Let's see the built-in defaults. + + ### [SendAgainMiddleware](../../../src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php) + + This strategy simply resends the given message to a queue. Let's see the constructor parameters through which it's configured: + + - `id` - A unique string. Allows to use this strategy more than once for the same message, just like in example above. + - `maxAttempts` - Maximum attempts count for this strategy with the given $id before it will give up. + - `queue` - The strategy will send the message to the given queue when it's not `null`. That means you can use this strategy to push a message not to the same queue channel it came from. When the `queue` parameter is set to `null`, a message will be sent to the same channel it came from. + + State tracking: + + - Uses `FailureEnvelope` metadata (`failure-meta`) to store the per-middleware attempt counter. + - The counter key is `failure-strategy-resend-attempts-{id}`. + + ### [ExponentialDelayMiddleware](../../../src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php) + + This strategy does the same thing as the `SendAgainMiddleware` with a single difference: it resends a message with an exponentially increasing delay. The delay **must** be implemented by the used `AdapterInterface` implementation. + +It's configured via constructor parameters, too. Here they are: + +- `id` - A unique string allows to use this strategy more than once for the same message, just like in example above. +- `maxAttempts` - Maximum attempts count for this strategy with the given $id before it will give up. + - `delayInitial` - The initial delay that will be applied to a message for the first time. It must be a positive float. + - `delayMaximum` - The maximum delay which can be applied to a single message. Must be above the `delayInitial`. + - `exponent` - Message handling delay will be multiplied by exponent each time it fails. + - `queue` - The strategy will send the message to the given queue when it's not `null`. That means you can use this strategy to push a message not to the same queue channel it came from. When the `queue` parameter is set to `null`, a message will be sent to the same channel it came from. -Let's see the built-in defaults. + Requirements: -### SendAgainMiddleware + - Requires a `DelayMiddlewareInterface` implementation and an adapter that supports delayed delivery. -This strategy simply resends the given message to a queue. Let's see the constructor parameters through which it's configured: + State tracking: -- `id` - A unique string. Allows to use this strategy more than once for the same message, just like in example above. -- `maxAttempts` - Maximum attempts count for this strategy with the given $id before it will give up. -- `queue` - The strategy will send the message to the given queue when it's not `null`. That means you can use this strategy to push a message not to the same queue channel it came from. When the `queue` parameter is set to `null`, a message will be sent to the same channel it came from. + - Uses `FailureEnvelope` metadata (`failure-meta`) to store attempts and the previous delay. + - The per-middleware keys are: -### ExponentialDelayMiddleware + - `failure-strategy-exponential-delay-attempts-{id}` + - `failure-strategy-exponential-delay-delay-{id}` -This strategy does the same thing as the `SendAgainMiddleware` with a single difference: it resends a message with an exponentially increasing delay. The delay **must** be implemented by the used `AdapterInterface` implementation. + ## Built-in failure handling components -It's configured via constructor parameters, too. Here they are: + This package ships the following built-in failure handling components. -- `id` - A unique string allows to use this strategy more than once for the same message, just like in example above. -- `maxAttempts` - Maximum attempts count for this strategy with the given $id before it will give up. -- `delayInitial` - The initial delay that will be applied to a message for the first time. It must be a positive float. -- `delayMaximum` - The maximum delay which can be applied to a single message. Must be above the `delayInitial`. -- `exponent` - Message handling delay will be multiplied by exponent each time it fails. -- `queue` - The strategy will send the message to the given queue when it's not `null`. That means you can use this strategy to push a message not to the same queue channel it came from. When the `queue` parameter is set to `null`, a message will be sent to the same channel it came from. + ### FailureEnvelope + + Class: `Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope` + + Behavior: + + - An envelope that stores failure-related metadata under the `failure-meta` key. + - Built-in failure middlewares use it to persist retry counters / delay parameters across retries. + + ### FailureFinalHandler + + Class: `Yiisoft\Queue\Middleware\FailureHandling\FailureFinalHandler` + + Behavior: + + - Terminal failure handler. + - Throws the exception from the request when the failure pipeline does not handle the failure. + + ### JobFailureException + + Class: `Yiisoft\Queue\Exception\JobFailureException` + + Behavior: + + - Thrown by the worker when failure handling does not resolve the issue. + - Wraps the original exception and includes the queue message id (if available) in the exception message. ## How to create a custom Failure Middleware? All you need is to implement the `MiddlewareFailureInterface` and add your implementation definition to the [configuration](#configuration). -This interface has the only method `handle`. And the method has these parameters: -- `ConsumeRequest $request` - a request for a message handling. It consists of a message and a queue the message came from. -- `Throwable $exception` - an exception thrown on the `request` handling +This interface has the only method `handle` with these parameters: +- [`FailureHandlingRequest $request`](../../../src/Middleware/FailureHandling/FailureHandlingRequest.php) - a request for a message handling. It consists of + - a [message](../../../src/Message/MessageInterface.php) + - a `Throwable $exception` object thrown on the `request` handling + - a queue the message came from - `MessageFailureHandlerInterface $handler` - failure strategy pipeline continuation. Your Middleware should call `$pipeline->handle()` when it shouldn't interrupt failure pipeline execution. -> Note: your strategy have to check by its own if it should be applied. Look into [`SendAgainMiddleware::suites()`](../../src/Middleware/Implementation/FailureMiddleware/Middleware/SendAgainMiddleware.php#L52) for an example. +> Note: your strategy have to check by its own if it should be applied. Look into [`SendAgainMiddleware::suites()`](../../../src/Middleware/FailureHandling/Implementation/SendAgainMiddleware.php#L54) for an example. diff --git a/docs/guide/en/job-status.md b/docs/guide/en/job-status.md new file mode 100644 index 00000000..9e3224ca --- /dev/null +++ b/docs/guide/en/job-status.md @@ -0,0 +1,86 @@ +# Job status + +Yii Queue can report a job status by its message ID. + +The API surface is: + +- `QueueInterface::status(string|int $id): JobStatus` +- `AdapterInterface::status(string|int $id): JobStatus` + +Status tracking support depends on the adapter. If an adapter doesn't store IDs or doesn't keep status history, you might not be able to use `status()` reliably. + +## Getting a message ID + +`QueueInterface::push()` returns a `MessageInterface`. When the adapter supports IDs, the returned message is typically wrapped into an `IdEnvelope`, which stores the ID in message metadata. + +To read the ID: + +```php +use Yiisoft\Queue\Message\IdEnvelope; + +$pushedMessage = $queue->push($message); +$id = $pushedMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? null; +``` + +If `$id` is `null`, the current adapter didn't provide an ID and you can't query a status. + +The ID type (`string` or `int`) and how long it stays queryable are adapter-specific. + +## Statuses + +Statuses are represented by the `Yiisoft\Queue\JobStatus` enum: + +- `JobStatus::WAITING` + The job exists in the queue and is waiting for execution. + +- `JobStatus::RESERVED` + A worker has taken the job for processing. + +- `JobStatus::DONE` + The job has been processed. + +In addition to enum cases, `JobStatus` provides a string key via `JobStatus::key()`: + +```php +$statusKey = $status->key(); // "waiting", "reserved" or "done" +``` + +## Querying a status + +```php +use Yiisoft\Queue\JobStatus; +use Yiisoft\Queue\Message\IdEnvelope; + +$pushedMessage = $queue->push($message); +$id = $pushedMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? null; + +if ($id === null) { + throw new \RuntimeException('The adapter did not provide a message ID, status tracking is unavailable.'); +} + +$status = $queue->status($id); + +if ($status === JobStatus::WAITING) { + // The job is waiting for execution. +} + +if ($status === JobStatus::RESERVED) { + // A worker is processing the job right now. +} + +if ($status === JobStatus::DONE) { + // The job has been processed. +} +``` + +## Errors and edge cases + +- **Unknown ID** + If an adapter can't find the message by ID, it must throw `InvalidArgumentException`. + +- **Timing** + `RESERVED` can be transient: depending on the adapter, a job may move from `WAITING` to `RESERVED` and then to `DONE` quickly. + +- **Failures / retries** + Job failures and retries are handled by the worker and middleware pipelines and are described in [Errors and retryable jobs](./error-handling.md). + How failures affect job status is adapter-specific. diff --git a/docs/guide/en/loops.md b/docs/guide/en/loops.md index e69de29b..dd957df4 100644 --- a/docs/guide/en/loops.md +++ b/docs/guide/en/loops.md @@ -0,0 +1,118 @@ +# Loops + +Yii Queue uses `\Yiisoft\Queue\Cli\LoopInterface` to control long-running execution. + +The loop is checked: + +- After each processed message (via `Queue::run()` / `Queue::listen()`). +- On each iteration of `queue:listen-all`. + +When the loop says it **cannot continue**, consuming stops gracefully (as soon as the current message is finished). + +See also: + +- [Console commands](console-commands.md) +- [Workers](worker.md) + +## Loop interface + +The interface is minimal: + +```php +namespace Yiisoft\Queue\Cli; + +interface LoopInterface +{ + public function canContinue(): bool; +} +``` + +Adapters receive a callback that returns `bool`. When the callback returns `false`, the adapter should stop consuming. + +## Built-in implementations + +### `SignalLoop` + +`\Yiisoft\Queue\Cli\SignalLoop` is used by default when `ext-pcntl` is available. + +It supports: + +- Graceful shutdown on `SIGHUP`, `SIGINT`, `SIGTERM`. +- Pause/resume via `SIGTSTP` and `SIGCONT`. +- Optional soft memory limit (see below). + +### `SimpleLoop` + +`\Yiisoft\Queue\Cli\SimpleLoop` is used by default when `ext-pcntl` is **not** available. + +It supports: + +- Optional soft memory limit. + +## Soft memory limit + +Both built-in loops accept `memorySoftLimit` (in bytes): + +- `0` means “no limit”. +- When the current process memory usage reaches the limit, `canContinue()` returns `false`. + +This is useful for recycling long-running workers in process managers such as systemd or Supervisor. + +## Configuration + +### With `yiisoft/config` + +By default, `LoopInterface` is resolved to `SignalLoop` when `ext-pcntl` is available, otherwise to `SimpleLoop`. + +To set a soft memory limit, configure both loop implementations: + +```php +use Yiisoft\Queue\Cli\SignalLoop; +use Yiisoft\Queue\Cli\SimpleLoop; + +return [ + SignalLoop::class => [ + '__construct()' => [ + 'memorySoftLimit' => 256 * 1024 * 1024, + ], + ], + SimpleLoop::class => [ + '__construct()' => [ + 'memorySoftLimit' => 256 * 1024 * 1024, + ], + ], +]; +``` + +To force a specific implementation regardless of `ext-pcntl` availability, override `LoopInterface` binding: + +```php +use Yiisoft\Queue\Cli\LoopInterface; +use Yiisoft\Queue\Cli\SimpleLoop; + +return [ + LoopInterface::class => SimpleLoop::class, +]; +``` + +### Manual configuration (without `yiisoft/config`) + +Instantiate the loop you want and pass it to `Queue` (and, depending on adapter, to adapter constructor as well): + +```php +use Yiisoft\Queue\Cli\SignalLoop; + +$loop = new SignalLoop(memorySoftLimit: 256 * 1024 * 1024); +``` + +## Writing a custom loop + +Implement `LoopInterface` and encapsulate your own stopping conditions: + +- Time limits. +- Message count limits. +- External stop flags. +- Integration with your own signal / shutdown handling. + +The only requirement is that `canContinue()` returns `false` when the worker should stop. + diff --git a/docs/guide/en/message-handler.md b/docs/guide/en/message-handler.md new file mode 100644 index 00000000..0c25261f --- /dev/null +++ b/docs/guide/en/message-handler.md @@ -0,0 +1,135 @@ +# Message handler + +A *message handler* is what processes a queue message. Internally, `Yiisoft\Queue\Worker\Worker` resolves a handler by the message handler name (`MessageInterface::getHandlerName()`) and then executes it through `Yiisoft\Injector\Injector`. + +Handler definitions are configured in: + +- `$params['yiisoft/queue']['handlers']` when using [yiisoft/config](https://github.com/yiisoft/config), or +- the `$handlers` argument of `Yiisoft\Queue\Worker\Worker` when creating it manually. + +## Supported handler definition formats + +### 1. HandlerInterface implementation (without mapping) + +If your handler is a dedicated class implementing `Yiisoft\Queue\Message\MessageHandlerInterface`, you can use the class name itself as the message handler name (FQCN) if your DI container can resolve the handler class. + +> By default the [yiisoft/di](https://github.com/yiisoft/di) container resolves all FQCNs into corresponding class objects. + +This is the default and most convenient option when the producer and the consumer are the same application. + +In this setup, you usually don't need to configure handler mapping at all as long as your DI container can resolve the handler class. + +**Message**: + +```php +new \Yiisoft\Queue\Message\Message(\App\Queue\RemoteFileHandler::class, ['url' => '...']); +``` + +**Handler**: + +```php +final class RemoteFileHandler implements \Yiisoft\Queue\Message\MessageHandlerInterface +{ + public function handle(\Yiisoft\Queue\Message\MessageInterface $message): void + { + // Handle the message + } +} +``` + +**Config**: + +Not needed + +**Pros**: + +- Minimal configuration. +- Stable refactoring inside the same application (rename-safe if you rename the class and update the producer code). +- Easy to unit-test the handler as a normal class. + +**Cons**: + +- Couples produced messages to PHP class names. +- Requires producer and consumer to share the same naming contract (usually “same app”). + +**Use when**: + +- Producer and consumer are the same application. +- You control message creation code and can safely use FQCN as the handler name. + +### 2. Named handlers + +In this case you should use a proper handler name when pushing a `Message` instead of a handler class name as in the example above: + +```php +new \Yiisoft\Queue\Message\Message('send-email', ['data' => '...']); +``` + +**Config**: + +Map handler name to a closure in `$params`: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'send-email' => /** handler definition */, + ], + ], +]; +``` + +Handler definition should be either an [extended callable definition](./callable-definitions-extended.md) or a string for your DI container to resolve a `MessageHandlerInterface` instance. + +## When mapping by short names is a better idea + +While FQCN-as-name is convenient inside a single application, mapping by a short name is often a better contract. That is true when messages are produced outside the current codebase, or when you want to create a stable public API for inter-service communication. + +**Typical cases**: + +- Another application pushes messages to the same broker. +- A different language/runtime produces messages. +- You want a stable public contract that is independent of your PHP namespaces and refactorings. + +In these cases you typically keep message handler names small and stable, and map them in config: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'file-download' => \App\Queue\RemoteFileHandler::class, + ], + ], +]; +``` + +This way external producers never need to know your internal PHP class names. + +## Common pitfalls and unsupported formats + +- A class-string that is not resolvable via `$container->has()` will not be auto-instantiated. +- [yiisoft/definitions](https://github.com/yiisoft/definitions) array format (like `['class' => ..., '__construct()' => ...]`) is **not** supported for handlers. + +## Recommended handler implementation styles + +- Prefer a dedicated handler class registered in DI. +- For maximal compatibility with the worker resolution rules either: + - Implement `MessageHandlerInterface` + - Make the handler invokable (`__invoke(MessageInterface $message): void`) + - Provide `[HandlerClass::class, 'handle']` and keep `handle(MessageInterface $message): void` as the entry point + +## Config location ([yiisoft/config](https://github.com/yiisoft/config)) + +When using [yiisoft/config](https://github.com/yiisoft/config), configure handlers under the [`yiisoft/queue`](https://github.com/yiisoft/queue) params key: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'handler-name' => [FooHandler::class, 'handle'], + ], + ], +]; +``` + +This config is consumed by the DI definitions from `config/di.php` where the `Worker` is constructed with `$params['yiisoft/queue']['handlers']`. diff --git a/docs/guide/en/middleware-pipelines.md b/docs/guide/en/middleware-pipelines.md new file mode 100644 index 00000000..d6d4d47e --- /dev/null +++ b/docs/guide/en/middleware-pipelines.md @@ -0,0 +1,160 @@ +# Middleware pipelines + +Yii Queue uses middlewares to run custom logic around message pushing and message processing. + +A middleware is a piece of code that receives a request object and can either: + +- change the request (for example, change the message, adapter, or error handling behavior) and continue the pipeline, or +- stop the pipeline by returning without calling the next handler. + +This is similar to HTTP middleware, but it is applied to queue messages. + +## What middlewares are for + +Common reasons to add middlewares: + +- **Collect metrics** + You can count pushed/processed messages, measure handler duration, or measure time between push and consume. +- **Add tracing / correlation data** + You can put trace ids or correlation ids into message metadata so logs from producer/consumer are connected. +- **Logging and observability** + You can log message ids, channels, attempts, and failures in a consistent way. +- **Modify the message payload** + You can obfuscate sensitive data, normalize payload, add extra fields required by consumers, or wrap a message into envelopes. +- **Route and schedule** + You can switch channel, choose a different adapter, or add delay when the adapter supports it. + +## Pipelines overview + +Each message may pass through three independent pipelines: + +- **Push pipeline** (executed when calling `QueueInterface::push()`). +- **Consume pipeline** (executed when a worker processes a message). +- **Failure handling pipeline** (executed when message processing throws a `Throwable`). + +The execution order inside a pipeline is forward in the same order you configured middlewares. + +```mermaid +graph LR + StartPush((Start)) --> PushMiddleware1[$middleware1] --> PushMiddleware2[$middleware2] --> Push(Push to a queue) + -.-> PushMiddleware2[$middleware2] -.-> PushMiddleware1[$middleware1] + PushMiddleware1[$middleware1] -.-> EndPush((End)) + + + StartConsume((Start)) --> ConsumeMiddleware1[$middleware1] --> ConsumeMiddleware2[$middleware2] --> Consume(Consume / handle) + -.-> ConsumeMiddleware2[$middleware2] -.-> ConsumeMiddleware1[$middleware1] + ConsumeMiddleware1[$middleware1] -.-> EndConsume((End)) + + + Consume -- Throwable --> StartFailure((Start failure)) + StartFailure --> FailureMiddleware1[$failure1] --> FailureMiddleware2[$failure2] --> Failure(Handle failure / retry / requeue) + -.-> FailureMiddleware2[$failure2] -.-> FailureMiddleware1[$failure1] + FailureMiddleware1[$failure1] -.-> EndFailure((End failure)) +``` + +## How to define a middleware + +You can use any of these formats: + +- A ready-to-use middleware object. +- An array in the format of [yiisoft/definitions](https://github.com/yiisoft/definitions), which defines a middleware implementation. +- A string for your DI container to resolve the middleware, e.g. `FooMiddleware::class`. +- An [extended callable definition](callable-definitions-extended.md). A callable should either be a middleware itself or return a configured middleware object. + +The required interface depends on the pipeline: + +- Push: `Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface` +- Consume: `Yiisoft\Queue\Middleware\Consume\MiddlewareConsumeInterface` +- Failure handling: `Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface` + +## Push pipeline + +The push pipeline is executed when calling `QueueInterface::push()`. + +Push middlewares can: + +- Modify the message (wrap it into envelopes, add metadata, obfuscate data, etc.). +- Modify the adapter (change channel, add delay, route to a different backend, etc.). + +In particular, push middlewares may define or replace the adapter that will be used to push the message. This can be useful when: + +- You choose a backend dynamically (for example, based on message type or payload). +- You route messages to different channels/backends (for example, `critical` vs `low`). +- You apply scheduling/delay logic in a middleware. + +The adapter is set by returning a modified request: + +```php +return $pushRequest->withAdapter($adapter); +``` + +### Adapter must be configured by the end of the pipeline + +The pipeline ends with a final handler that actually pushes the message using the adapter. + +If the adapter is not configured by the time the pipeline reaches the final handler, +`Yiisoft\Queue\Exception\AdapterNotConfiguredException` is thrown. + +### Custom push middleware + +Implement `MiddlewarePushInterface` and return a modified `PushRequest` from `processPush()`: + +```php +return $pushRequest + ->withMessage($newMessage) + ->withAdapter($newAdapter); +``` + +## Consume pipeline + +The consume pipeline is executed by the worker while processing a message. + +Consume middlewares are often used to modify the message and/or collect runtime information: + +- Measure handler execution time. +- Add correlation ids and include them into logs. +- Convert thrown exceptions into domain-specific failures. + +The final handler of the consume pipeline invokes the resolved message handler. + +## Failure handling pipeline + +When a `Throwable` escapes the consume pipeline, the worker switches to the failure handling pipeline. + +The pipeline receives a `FailureHandlingRequest` that contains: + +- the message +- the caught exception +- the queue instance + +The pipeline is selected by queue channel; if there is no channel-specific pipeline configured, +`FailureMiddlewareDispatcher::DEFAULT_PIPELINE` is used. + +See [Error handling on message processing](error-handling.md) for the step-by-step flow and built-in middlewares. + +## Configuration + +### With yiisoft/config + +When using [yiisoft/config](https://github.com/yiisoft/config), pipelines are configured in params under `yiisoft/queue`: + +- `middlewares-push` +- `middlewares-consume` +- `middlewares-fail` + +See [Configuration with yiisoft/config](configuration-with-config.md) for examples. + +### Manual configuration (without yiisoft/config) + +When configuring the component manually, you instantiate the middleware dispatchers and pass them to `Queue` / `Worker`. + +See [Manual configuration](configuration-manual.md) for a full runnable example. + +## Runtime overrides + +You can override middleware stacks at runtime: + +- `Queue::withMiddlewares(...)` replaces the whole push middleware stack for that queue instance. +- `Queue::withMiddlewaresAdded(...)` appends middlewares to the existing stack. + +These methods affect only the push pipeline of that `Queue` instance. diff --git a/docs/guide/en/prerequisites-and-installation.md b/docs/guide/en/prerequisites-and-installation.md new file mode 100644 index 00000000..bdcb33ba --- /dev/null +++ b/docs/guide/en/prerequisites-and-installation.md @@ -0,0 +1,24 @@ +# Prerequisites and installation + +## Requirements + +- PHP 8.1 or higher. +- PCNTL extension for signal handling (optional, recommended for production use). + +If `ext-pcntl` is not installed, workers cannot handle OS signals (such as `SIGTERM`/`SIGINT`) gracefully. +In practice it means a process manager may terminate a worker at any time, which can interrupt a job in the middle of execution. +See [Loops](loops.md) for details. + +## Installation + +Install the package with [Composer](https://getcomposer.org): + +```shell +composer require yiisoft/queue +``` + +## Next steps + +- [Configuration with yiisoft/config](configuration-with-config.md) +- [Manual configuration](configuration-manual.md) +- [Adapter list](adapter-list.md) diff --git a/docs/guide/en/producing-messages-from-external-systems.md b/docs/guide/en/producing-messages-from-external-systems.md new file mode 100644 index 00000000..647ba98c --- /dev/null +++ b/docs/guide/en/producing-messages-from-external-systems.md @@ -0,0 +1,132 @@ +# Producing messages from external systems + +This guide explains how to publish messages to a queue backend (RabbitMQ, Kafka, SQS, etc.) from *external producers* (including non-PHP services) so that `yiisoft/queue` consumers can correctly deserialize and process these messages. + +The key idea is simple: + +- The queue adapter reads a *raw payload* (usually a string) from the broker. +- The adapter passes that payload to a `Yiisoft\Queue\Message\MessageSerializerInterface` implementation. +- By default, `yiisoft/queue` config binds `MessageSerializerInterface` to `Yiisoft\Queue\Message\JsonMessageSerializer`. + +`JsonMessageSerializer` is only the default implementation. You can replace it with your own serializer by rebinding `Yiisoft\Queue\Message\MessageSerializerInterface` in your DI configuration. + +So, external systems should produce the **same payload format** that your consumer-side serializer expects (JSON described below is for the default `JsonMessageSerializer`). + +## 1. Handler name contract (most important part) + +`yiisoft/queue` resolves a handler by message handler name (`MessageInterface::getHandlerName()`). + +For external producers, you should not rely on PHP FQCN handler names. Prefer a stable short name and map it in the consumer application configuration (see [Message handler](message-handler.md)). + +Example mapping: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'file-download' => \App\Queue\RemoteFileHandler::class, + ], + ], +]; +``` + +External producer then always publishes `"name": "file-download"`. + +## 2. JSON payload format (JsonMessageSerializer) + +`Yiisoft\Queue\Message\JsonMessageSerializer` expects the message body to be a JSON object with these keys: + +- `name` (string, required) +- `data` (any JSON value, optional; defaults to `null`) +- `meta` (object, optional; defaults to `{}`) + +Minimal example: + +```json +{ + "name": "file-download", + "data": { + "url": "https://example.com/file.pdf", + "destinationFile": "/tmp/file.pdf" + } +} +``` + +Full example: + +```json +{ + "name": "file-download", + "data": { + "url": "https://example.com/file.pdf", + "destinationFile": "/tmp/file.pdf" + }, + "meta": { + "trace-id": "1f2c0e10b7b44c67", + "tenant-id": "acme" + } +} +``` + +### Notes about `meta` + +The `meta` key is used by `yiisoft/queue` for internal processing (like tracing and correlation) and should not be set by external systems. + +## 3. Data encoding rules + +- The payload must be UTF-8 JSON. +- `data` and `meta` must contain only JSON-encodable values: + - strings, numbers, booleans, null + - arrays + - objects (maps) + +If your broker stores bytes, publish the UTF-8 bytes of the JSON string. + +## 4. Publishing to a broker: what exactly to send + +`yiisoft/queue` itself does not define a network protocol. The exact “where” this JSON goes depends on the adapter: + +- Some adapters put this JSON into the broker message **body**. +- Some adapters may additionally use broker headers/attributes. + +For external producers you should: + +- Use the adapter documentation of your chosen backend (AMQP / Kafka / SQS / etc.) to know which queue/topic and routing settings to use. +- Ensure the **message body** is exactly the JSON described above (unless the adapter docs explicitly say otherwise). + +## 5. Examples (non-PHP) + +These examples show how to produce the JSON body. You still need to publish it with your broker-specific client. + +### Python (constructing JSON body) + +```python +import json + +payload = { + "name": "file-download", + "data": {"url": "https://example.com/file.pdf"} +} + +body = json.dumps(payload, ensure_ascii=False).encode("utf-8") +``` + +### Node.js (constructing JSON body) + +```js +const payload = { + name: 'file-download', + data: { url: 'https://example.com/file.pdf' }, +}; + +const body = Buffer.from(JSON.stringify(payload), 'utf8'); +``` + +### curl (for HTTP-based brokers / gateways) + +```sh +curl -X POST \ + -H 'Content-Type: application/json' \ + --data '{"name":"file-download","data":{"url":"https://example.com/file.pdf"}}' \ + https://your-broker-gateway.example.com/publish +``` diff --git a/docs/guide/en/usage.md b/docs/guide/en/usage.md index 9a663a88..e7c450fa 100644 --- a/docs/guide/en/usage.md +++ b/docs/guide/en/usage.md @@ -1,5 +1,9 @@ # Usage basics +## Queue channels + +For a detailed explanation of what channels are and how to configure and use them (including CLI examples), see [Queue channels](channels.md). + ## Configuration You can configure it with a DI container in the following way: @@ -44,8 +48,13 @@ $queue->push($message); To push a job into the queue that should run after 5 minutes: +Delayed execution is implemented via a push middleware. +The middleware must implement `\Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface` and be provided by the adapter package you use. +For example, the official AMQP adapter supports delays: + ```php -// TODO +$delayMiddleware = $container->get(\Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface::class); +$queue->push($message, $delayMiddleware->withDelay(5 * 60)); ``` **Important:** Not every adapter (such as synchronous adapter) supports delayed execution. @@ -57,26 +66,42 @@ The exact way how a job is executed depends on the adapter used. Most adapters c console commands, which the component registers in your application. For more details, check the respective adapter documentation. +If you configured multiple channels, you can choose which channel to consume with console commands: + +```sh +yii queue:listen [channel] +yii queue:run [channel1 [channel2 [...]]] +yii queue:listen-all [channel1 [channel2 [...]]] +``` + ## Job status ```php -// Push a job into the queue and get a message ID. -$id = $queue->push(new SomeJob()); +use Yiisoft\Queue\JobStatus; +use Yiisoft\Queue\Message\IdEnvelope; + +$pushedMessage = $queue->push($message); +$id = $pushedMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? null; + +if ($id === null) { + throw new \RuntimeException('The adapter did not provide a message ID, status tracking is unavailable.'); +} -// Get job status. $status = $queue->status($id); // Check whether the job is waiting for execution. -$status->isWaiting(); +$status === JobStatus::WAITING; // Check whether a worker got the job from the queue and executes it. -$status->isReserved($id); +$status === JobStatus::RESERVED; // Check whether a worker has executed the job. -$status->isDone($id); +$status === JobStatus::DONE; ``` +For details and edge cases, see [Job status](job-status.md). + ## Limitations When using queues, it is important to remember that tasks are put into and obtained from the queue in separate diff --git a/docs/guide/en/worker.md b/docs/guide/en/worker.md index 7ac12ea1..5fdaf914 100644 --- a/docs/guide/en/worker.md +++ b/docs/guide/en/worker.md @@ -1,45 +1,13 @@ -# Configuration - -To use a worker, you should resolve its dependencies (e.g., through DI container) and define handlers for each message -that will be consumed by this worker; - -Handlers are callables indexed by payload names. When a message is consumed from the queue, a callable associated with -its payload name is called. - -## Handler format - -Handler can be any callable with a couple of additions: - -- If a handler is provided as an array of two strings, it will be treated as a DI container service id and its method. - E.g. `[ClassName::class, 'handle']` will be resolved to: - ```php - $container - ->get(ClassName::class) - ->handle(); - ``` -- An `Injector` is used to call the handlers. This means you can define handlers as closures with their own dependencies - which will be resolved with DI container. In the example below you can see a closure in which `message` will be taken - from the queue and `ClientInterface` will be resolved via DI container. - - ```php - 'payloadName' => fn (MessageInterface $message, ClientInterface $client) => $client->send($message->getPayloadData()), - ``` - - ```php - $handlers = [ - 'simple' => fn() => 'someWork', - 'anotherHandler' => [QueueHandlerCollection::class, 'methodName'] - ]; - $worker = new Worker( - $handlers, - new \Psr\Log\NullLogger(), - new \Yiisoft\Injector\Injector($DIContainer), - $DIContainer - ); - ``` +# Worker + +To use a worker, you should resolve its dependencies (e.g., through DI container) and [define handlers](message-handler.md) for each message that will be consumed by this worker. ## Starting Workers +To start a worker, you should run the console commands such as `queue:run`, `queue:listen`, and `queue:listen-all`. See [Console commands](console-commands.md) for details. + +Below are three popular ways to run consumers in production so that they keep running in memory and are automatically restarted if needed. + ### Supervisor [Supervisor](http://supervisord.org) is a process monitor for Linux. It automatically starts console processes. diff --git a/src/Command/ListenAllCommand.php b/src/Command/ListenAllCommand.php index 9703ee59..31a7041e 100644 --- a/src/Command/ListenAllCommand.php +++ b/src/Command/ListenAllCommand.php @@ -18,7 +18,8 @@ 'Listens the all the given queues and executes messages as they come. ' . 'Meant to be used in development environment only. ' . 'Listens all configured queues by default in case you\'re using yiisoft/config. ' - . 'Needs to be stopped manually.' + . 'Needs to be stopped manually.', + ['queue:listen:all'], )] final class ListenAllCommand extends Command { diff --git a/src/Message/JsonMessageSerializer.php b/src/Message/JsonMessageSerializer.php index d9b3a92e..4635b632 100644 --- a/src/Message/JsonMessageSerializer.php +++ b/src/Message/JsonMessageSerializer.php @@ -55,7 +55,7 @@ public function unserialize(string $value): MessageInterface } $meta[EnvelopeInterface::ENVELOPE_STACK_KEY] = []; - $class = $payload['meta']['message-class'] ?? Message::class; + $class = $meta['message-class'] ?? Message::class; // Don't check subclasses when it's a default class: that's faster if ($class !== Message::class && !is_subclass_of($class, MessageInterface::class)) { $class = Message::class; diff --git a/src/Middleware/CallableFactory.php b/src/Middleware/CallableFactory.php index 09c41aa3..81c36618 100644 --- a/src/Middleware/CallableFactory.php +++ b/src/Middleware/CallableFactory.php @@ -4,6 +4,7 @@ namespace Yiisoft\Queue\Middleware; +use Closure; use Psr\Container\ContainerExceptionInterface; use Psr\Container\ContainerInterface; use Psr\Container\NotFoundExceptionInterface; @@ -12,6 +13,7 @@ use function is_array; use function is_callable; +use function is_object; use function is_string; /** @@ -34,28 +36,45 @@ public function __construct( */ public function create(mixed $definition): callable { - $callable = null; + if ($definition === null) { + throw new InvalidCallableConfigurationException(); + } + + if ($definition instanceof Closure) { + return $definition; + } if (is_string($definition) && $this->container->has($definition)) { - // Object with an __invoke() method - $callable = $this->container->get($definition); + $result = $this->container->get($definition); + + if (is_callable($result)) { + return $result; + } + + throw new InvalidCallableConfigurationException(); } if (is_array($definition) && array_keys($definition) === [0, 1] - && is_string($definition[0]) && is_string($definition[1]) ) { - [$className, $methodName] = $definition; - $callable = $this->fromDefinition($className, $methodName); - } + if (is_object($definition[0])) { + $callable = $this->fromObjectDefinition($definition[0], $definition[1]); + if ($callable !== null) { + return $callable; + } + } - if ($callable === null) { - $callable = $definition; + if (is_string($definition[0])) { + $callable = $this->fromDefinition($definition[0], $definition[1]); + if ($callable !== null) { + return $callable; + } + } } - if (is_callable($callable)) { - return $callable; + if (is_callable($definition)) { + return $definition; } throw new InvalidCallableConfigurationException(); @@ -88,4 +107,17 @@ private function fromDefinition(string $className, string $methodName): ?callabl return is_callable($result) ? $result : null; } + + private function fromObjectDefinition(object $object, string $methodName): ?callable + { + try { + new ReflectionMethod($object, $methodName); + } catch (ReflectionException) { + return null; + } + + $result = [$object, $methodName]; + + return is_callable($result) ? $result : null; + } } diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index 33c2dff6..29fc47cd 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -5,18 +5,16 @@ namespace Yiisoft\Queue\Worker; use Closure; -use Psr\Container\ContainerExceptionInterface; use Psr\Container\ContainerInterface; -use Psr\Container\NotFoundExceptionInterface; use Psr\Log\LoggerInterface; -use ReflectionException; -use ReflectionMethod; use RuntimeException; use Throwable; use Yiisoft\Injector\Injector; use Yiisoft\Queue\Exception\JobFailureException; -use Yiisoft\Queue\Message\MessageHandlerInterface; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Message\MessageHandlerInterface; +use Yiisoft\Queue\Middleware\CallableFactory; +use Yiisoft\Queue\Middleware\InvalidCallableConfigurationException; use Yiisoft\Queue\Middleware\Consume\ConsumeFinalHandler; use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; use Yiisoft\Queue\Middleware\Consume\ConsumeRequest; @@ -30,15 +28,18 @@ final class Worker implements WorkerInterface { + /** @var array Cache of resolved handlers */ private array $handlersCached = []; public function __construct( + /** @var array */ private readonly array $handlers, private readonly LoggerInterface $logger, private readonly Injector $injector, private readonly ContainerInterface $container, private readonly ConsumeMiddlewareDispatcher $consumeMiddlewareDispatcher, private readonly FailureMiddlewareDispatcher $failureMiddlewareDispatcher, + private readonly CallableFactory $callableFactory, ) { } @@ -50,7 +51,12 @@ public function process(MessageInterface $message, QueueInterface $queue): Messa $this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']); $name = $message->getHandlerName(); - $handler = $this->getHandler($name); + try { + $handler = $this->getHandler($name); + } catch (InvalidCallableConfigurationException $exception) { + throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $name), 0, $exception); + } + if ($handler === null) { throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $name)); } @@ -77,82 +83,27 @@ public function process(MessageInterface $message, QueueInterface $queue): Messa private function getHandler(string $name): ?callable { - if (!array_key_exists($name, $this->handlersCached)) { - $definition = $this->handlers[$name] ?? null; - if ($definition === null && $this->container->has($name)) { - $handler = $this->container->get($name); - if ($handler instanceof MessageHandlerInterface) { - $this->handlersCached[$name] = $handler->handle(...); - - return $this->handlersCached[$name]; - } - - return null; - } - - $this->handlersCached[$name] = $this->prepare($this->handlers[$name] ?? null); - } - - return $this->handlersCached[$name]; - } - - /** - * Checks if the handler is a DI container alias - * - * @param array|callable|object|string|null $definition - * - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - * @return callable|null - */ - private function prepare(callable|object|array|string|null $definition): callable|null - { - if (is_string($definition) && $this->container->has($definition)) { - return $this->container->get($definition); + if ($name === '') { + return null; } - if ( - is_array($definition) - && array_keys($definition) === [0, 1] - && is_string($definition[0]) - && is_string($definition[1]) - ) { - [$className, $methodName] = $definition; - - if (!class_exists($className) && $this->container->has($className)) { - return [ - $this->container->get($className), - $methodName, - ]; - } - - if (!class_exists($className)) { - $this->logger->error("$className doesn't exist."); + if (!array_key_exists($name, $this->handlersCached)) { + $definition = $this->handlers[$name] ?? $name; - return null; - } + if (is_string($definition) && $this->container->has($definition)) { + $resolved = $this->container->get($definition); - try { - $reflection = new ReflectionMethod($className, $methodName); - } catch (ReflectionException $e) { - $this->logger->error($e->getMessage()); + if ($resolved instanceof MessageHandlerInterface) { + $this->handlersCached[$name] = $resolved->handle(...); - return null; - } - if ($reflection->isStatic()) { - return [$className, $methodName]; - } - if ($this->container->has($className)) { - return [ - $this->container->get($className), - $methodName, - ]; + return $this->handlersCached[$name]; + } } - return null; + $this->handlersCached[$name] = $this->callableFactory->create($definition); } - return $definition; + return $this->handlersCached[$name]; } private function createConsumeHandler(Closure $handler): MessageHandlerConsumeInterface diff --git a/tests/Benchmark/QueueBench.php b/tests/Benchmark/QueueBench.php index 76480523..9c67326f 100644 --- a/tests/Benchmark/QueueBench.php +++ b/tests/Benchmark/QueueBench.php @@ -52,6 +52,7 @@ public function __construct() new MiddlewareFactoryFailure($container, $callableFactory), [], ), + $callableFactory, ); $this->serializer = new JsonMessageSerializer(); $this->adapter = new VoidAdapter($this->serializer); diff --git a/tests/Integration/MessageConsumingTest.php b/tests/Integration/MessageConsumingTest.php index 8e1d40e7..45a281df 100644 --- a/tests/Integration/MessageConsumingTest.php +++ b/tests/Integration/MessageConsumingTest.php @@ -9,6 +9,7 @@ use Yiisoft\Injector\Injector; use Yiisoft\Queue\Message\Message; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Middleware\CallableFactory; use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher; @@ -28,6 +29,7 @@ public function testMessagesConsumed(): void $this->messagesProcessedSecond = []; $container = $this->createMock(ContainerInterface::class); + $callableFactory = new CallableFactory($container); $worker = new Worker( [ 'test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData(), @@ -37,7 +39,8 @@ public function testMessagesConsumed(): void new Injector($container), $container, new ConsumeMiddlewareDispatcher($this->createMock(MiddlewareFactoryConsumeInterface::class)), - new FailureMiddlewareDispatcher($this->createMock(MiddlewareFactoryFailureInterface::class), []) + new FailureMiddlewareDispatcher($this->createMock(MiddlewareFactoryFailureInterface::class), []), + $callableFactory, ); $messages = [1, 'foo', 'bar-baz']; @@ -56,13 +59,15 @@ public function testMessagesConsumedByHandlerClass(): void $container = $this->createMock(ContainerInterface::class); $container->method('get')->with(TestHandler::class)->willReturn($handler); $container->method('has')->with(TestHandler::class)->willReturn(true); + $callableFactory = new CallableFactory($container); $worker = new Worker( [], new NullLogger(), new Injector($container), $container, new ConsumeMiddlewareDispatcher($this->createMock(MiddlewareFactoryConsumeInterface::class)), - new FailureMiddlewareDispatcher($this->createMock(MiddlewareFactoryFailureInterface::class), []) + new FailureMiddlewareDispatcher($this->createMock(MiddlewareFactoryFailureInterface::class), []), + $callableFactory, ); $messages = [1, 'foo', 'bar-baz']; diff --git a/tests/Integration/MiddlewareTest.php b/tests/Integration/MiddlewareTest.php index d3752ec1..93ddc900 100644 --- a/tests/Integration/MiddlewareTest.php +++ b/tests/Integration/MiddlewareTest.php @@ -116,6 +116,7 @@ public function testFullStackConsume(): void $container, $consumeMiddlewareDispatcher, $failureMiddlewareDispatcher, + $callableFactory, ); $message = new Message('test', ['initial']); diff --git a/tests/TestCase.php b/tests/TestCase.php index 8f64bd8c..9d3be088 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -136,6 +136,7 @@ protected function createWorker(): WorkerInterface $this->getContainer(), $this->getConsumeMiddlewareDispatcher(), $this->getFailureMiddlewareDispatcher(), + new CallableFactory($this->getContainer()), ); } diff --git a/tests/Unit/Middleware/CallableFactoryTest.php b/tests/Unit/Middleware/CallableFactoryTest.php index ce8247ad..028925ca 100644 --- a/tests/Unit/Middleware/CallableFactoryTest.php +++ b/tests/Unit/Middleware/CallableFactoryTest.php @@ -4,6 +4,7 @@ namespace Yiisoft\Queue\Tests\Unit\Middleware; +use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\TestCase; use Yiisoft\Queue\Middleware\CallableFactory; use Yiisoft\Queue\Middleware\InvalidCallableConfigurationException; @@ -11,27 +12,48 @@ final class CallableFactoryTest extends TestCase { - public function testCreateFromContainerStringInvokable(): void + #[DataProvider('positiveDefinitionsProvider')] + public function testCreatePositive(mixed $definition, array $arguments, mixed $expectedResult, SimpleContainer $container): void { + $factory = new CallableFactory($container); + $callable = $factory->create($definition); + + self::assertIsCallable($callable); + self::assertSame($expectedResult, $callable(...$arguments)); + } + + public static function positiveDefinitionsProvider(): iterable + { + yield 'closure' => [ + static fn (): string => 'ok', + [], + 'ok', + new SimpleContainer(), + ]; + + yield 'callable string' => [ + 'strlen', + ['foo'], + 3, + new SimpleContainer(), + ]; + $invokable = new class () { public function __invoke(): string { return 'ok'; } }; - $container = new SimpleContainer([ - 'invokable' => $invokable, - ]); - $factory = new CallableFactory($container); - $callable = $factory->create('invokable'); - - self::assertIsCallable($callable); - self::assertSame('ok', $callable()); - } + yield 'container string invokable' => [ + 'invokable', + [], + 'ok', + new SimpleContainer([ + 'invokable' => $invokable, + ]), + ]; - public function testCreateFromStaticMethodArray(): void - { $class = new class () { public static function ping(): string { @@ -39,33 +61,134 @@ public static function ping(): string } }; $className = $class::class; - $container = new SimpleContainer(); + yield 'static method array' => [ + [$className, 'ping'], + [], + 'pong', + new SimpleContainer(), + ]; + + $serviceFromContainer = new class () { + public function go(): string + { + return 'ok'; + } + }; + $serviceClassName = $serviceFromContainer::class; + + yield 'container object method' => [ + [$serviceClassName, 'go'], + [], + 'ok', + new SimpleContainer([ + $serviceClassName => $serviceFromContainer, + ]), + ]; + + $service = new class () { + public function go(): string + { + return 'ok'; + } + }; + + yield 'object method array' => [ + $service->go(...), + [], + 'ok', + new SimpleContainer(), + ]; + + $serviceById = new class () { + public function go(): string + { + return 'ok'; + } + }; + + yield 'container id method' => [ + ['service', 'go'], + [], + 'ok', + new SimpleContainer([ + 'service' => $serviceById, + ]), + ]; + } + + #[DataProvider('negativeDefinitionsProvider')] + public function testCreateNegative(mixed $definition, SimpleContainer $container): void + { $factory = new CallableFactory($container); - $callable = $factory->create([$className, 'ping']); - self::assertIsCallable($callable); - self::assertSame('pong', $callable()); + $this->expectException(InvalidCallableConfigurationException::class); + $factory->create($definition); } - public function testCreateFromContainerObjectMethod(): void + public static function negativeDefinitionsProvider(): iterable { + yield 'null' => [ + null, + new SimpleContainer(), + ]; + + yield 'string not callable and not in container' => [ + 'notExistingCallable', + new SimpleContainer(), + ]; + + yield 'container string not callable' => [ + 'notCallable', + new SimpleContainer([ + 'notCallable' => new \stdClass(), + ]), + ]; + $service = new class () { public function go(): string { return 'ok'; } }; - $className = $service::class; - $container = new SimpleContainer([ - $className => $service, - ]); - $factory = new CallableFactory($container); - $callable = $factory->create([$className, 'go']); + yield 'object method array invalid method' => [ + [$service, 'missing'], + new SimpleContainer(), + ]; - self::assertIsCallable($callable); - self::assertSame('ok', $callable()); + $class = new class () { + public function ping(): string + { + return 'pong'; + } + }; + $className = $class::class; + + yield 'non-static method array without container' => [ + [$className, 'ping'], + new SimpleContainer(), + ]; + + yield 'invalid array definition' => [ + ['onlyOneElement'], + new SimpleContainer(), + ]; + + $serviceWithoutMethod = new class () { + public function go(): string + { + return 'ok'; + } + }; + $serviceClassName = $serviceWithoutMethod::class; + + yield 'class in container but method missing' => [ + [$serviceClassName, 'missing'], + new SimpleContainer([ + $serviceClassName => $serviceWithoutMethod, + ]), + ]; } public function testFriendlyException(): void diff --git a/tests/Unit/WorkerTest.php b/tests/Unit/WorkerTest.php index b8f466f6..a1a6f035 100644 --- a/tests/Unit/WorkerTest.php +++ b/tests/Unit/WorkerTest.php @@ -22,6 +22,7 @@ use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest; use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface; use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFactoryFailureInterface; +use Yiisoft\Queue\Middleware\CallableFactory; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Tests\App\FakeHandler; use Yiisoft\Queue\Tests\App\StaticMessageHandler; @@ -60,6 +61,10 @@ public static function jobExecutedDataProvider(): iterable FakeHandler::class, [FakeHandler::class => new FakeHandler()], ]; + yield 'definition-object' => [ + [new FakeHandler(), 'execute'], + [], + ]; yield 'definition-class' => [ [FakeHandler::class, 'execute'], [FakeHandler::class => new FakeHandler()], @@ -178,6 +183,7 @@ private function createWorkerByParams( $container, new ConsumeMiddlewareDispatcher($consumeMiddlewareFactory), new FailureMiddlewareDispatcher($failureMiddlewareFactory, []), + new CallableFactory($container), ); } @@ -244,13 +250,15 @@ public function testJobFailureIsHandledSuccessfully(): void $failureMiddlewareFactory->method('createFailureMiddleware')->willReturn($failureMiddleware); $failureDispatcher = new FailureMiddlewareDispatcher($failureMiddlewareFactory, ['test-channel' => ['simple']]); + $container = new SimpleContainer(); $worker = new Worker( ['simple' => fn () => null], new NullLogger(), - new Injector(new SimpleContainer()), - new SimpleContainer(), + new Injector($container), + $container, $consumeDispatcher, - $failureDispatcher + $failureDispatcher, + new CallableFactory($container), ); $result = $worker->process($message, $queue);