Refactor: Introduce fluent API for dispatch() helper function#998
Closed
huangdijia wants to merge 19 commits into
Closed
Refactor: Introduce fluent API for dispatch() helper function#998huangdijia wants to merge 19 commits into
huangdijia wants to merge 19 commits into
Conversation
This commit refactors the dispatch() helper function to return pending dispatch objects instead of immediately executing the dispatch operation. This enables a more flexible and Laravel-like fluent API for configuring dispatch behavior.
Changes:
- Refactored dispatch() function to return Pending dispatch objects
- Added PendingAsyncQueueDispatch for JobInterface dispatching with chainable methods (onPool, delay, setMaxAttempts)
- Added PendingAmqpProducerMessageDispatch for AMQP message dispatching with chainable methods (onPool, setConfirm, setTimeout)
- Added PendingKafkaProducerMessageDispatch for Kafka message dispatching with chainable methods (onPool, withHeader)
- Removed direct dependencies on DriverFactory, Producer, and ProducerManager from Functions.php
- Improved type safety with enhanced PHPDoc annotations
Benefits:
- Cleaner, more intuitive API: dispatch($job)->onPool('custom')->delay(10)
- Better separation of concerns
- More flexible configuration options
- Follows Laravel's dispatch pattern for familiarity
- Deferred execution via __destruct() allows full configuration before dispatch
Breaking changes: None - the basic dispatch($job) usage remains unchanged, but advanced configurations now use the fluent API instead of positional parameters.
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. 🗂️ Base branches to auto review (1)
Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Added extensive test coverage for the new fluent dispatch API including: Test Coverage: - dispatch() function type detection and routing - Closure wrapping in CallQueuedClosure - JobInterface handling - AMQP ProducerMessage handling - Kafka ProduceMessage handling - Invalid type rejection - PendingAsyncQueueDispatch tests - onPool() method chaining - delay() configuration - setMaxAttempts() integration - Fluent API chaining - Destruct execution verification - Conditionable trait (when/unless) - PendingAmqpProducerMessageDispatch tests - onPool() configuration - setConfirm() method - setTimeout() method - Fluent API chaining - Destruct execution verification - Conditionable trait support - PendingKafkaProducerMessageDispatch tests - onPool() configuration - withHeader() method - Multiple header support - Fluent API chaining - Destruct execution verification - Conditionable trait support - Integration tests - Backward compatibility with basic dispatch - Error handling and exception propagation All 24 tests passing with proper mocking of: - DriverFactory and AsyncQueue Driver - AMQP Producer - Kafka ProducerManager and Producer - ApplicationContext container setup
Changed the expected return type of dispatch() from bool to PendingAsyncQueueDispatch in the test to reflect updated behavior.
huangdijia
commented
Nov 13, 2025
huangdijia
commented
Nov 13, 2025
… setting application headers
…ispatch for setting message properties
…matching for job dispatching
- Deleted `PendingAsyncQueueDispatch`, `PendingKafkaProducerMessageDispatch`, and `DispatchTest` classes. - Introduced new implementations for `PendingAsyncQueueDispatch`, `PendingKafkaProducerMessageDispatch`, and `PendingAmqpProducerMessageDispatch` with enhanced functionality. - Updated the `dispatch` function to accommodate new classes and ensure compatibility with various job types. - Added comprehensive tests for the new dispatching classes to ensure expected behavior and error handling.
huangdijia
commented
Nov 13, 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR refactors the
dispatch()helper function to introduce a fluent API pattern, making it more flexible and intuitive to use. Instead of immediately executing dispatch operations, the function now returns pending dispatch objects that can be configured using method chaining before execution.Changes
New Classes
PendingAsyncQueueDispatch: Handles JobInterface dispatching with chainable configuration methods
onPool(string $pool): Specify which queue pool to usedelay(int $delay): Set delay in seconds before job executionsetMaxAttempts(int $maxAttempts): Configure maximum retry attemptsPendingAmqpProducerMessageDispatch: Handles AMQP message dispatching with chainable configuration
onPool(string $pool): Specify which connection pool to usesetConfirm(bool $confirm): Enable/disable producer confirmationsetTimeout(int $timeout): Set operation timeoutPendingKafkaProducerMessageDispatch: Handles Kafka message dispatching with chainable configuration
onPool(string $pool): Specify which producer pool to usewithHeader(string $key, string $value): Add custom headers to messagesModified Files
dispatch()function to return Pending dispatch objects instead of immediately dispatchingAPI Examples
Before (positional parameters)
After (fluent API)
Benefits
dispatch($job)usage still works exactly the same__destruct()Breaking Changes
None. The basic usage
dispatch($job)remains unchanged. Advanced configurations that used positional parameters should migrate to the fluent API.Test Plan
dispatch($job)when()andunless()methods