Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
91b975f
refactor: introduce fluent API for dispatch() helper function
huangdijia Nov 13, 2025
097eed9
docs: add warning about return value assignment in dispatch() function
huangdijia Nov 13, 2025
bfd98db
test: add comprehensive tests for fluent dispatch API
huangdijia Nov 13, 2025
ab8931a
test: refactor DispatchTest to improve container mock setup and excep…
huangdijia Nov 13, 2025
1c87ac2
Update dispatch() return type in test
huangdijia Nov 13, 2025
69813b4
test: add type assertion for dispatch() with closure
huangdijia Nov 13, 2025
e6891a1
Apply suggestion from @huangdijia
huangdijia Nov 13, 2025
e5573f7
Apply suggestion from @huangdijia
huangdijia Nov 13, 2025
16dbeb2
feat: add withHeader method to PendingAmqpProducerMessageDispatch for…
huangdijia Nov 13, 2025
1127b22
feat: add setKey and setValue methods to PendingKafkaProducerMessageD…
huangdijia Nov 13, 2025
cb2ca09
feat: enhance dispatch function to check class existence before type …
huangdijia Nov 13, 2025
dee0cf0
Refactor dispatching mechanism and remove deprecated classes
huangdijia Nov 13, 2025
22dd7e4
feat: 添加 AsyncTaskInterface 的使用声明
huangdijia Nov 13, 2025
04beb0d
fix: 修复导入语句,确保正确使用 Hyperf\Amqp\Producer
huangdijia Nov 13, 2025
7ef7ce2
fix: 修复 dispatch() 测试的返回类型,确保返回布尔值
huangdijia Nov 13, 2025
dc71e9c
fix: 修复导入语句,确保正确使用 FriendsOfHyperf\Support\dispatch 函数
huangdijia Nov 13, 2025
8095595
fix: 添加弃用注释,建议使用 FriendsOfHyperf\Support\dispatch() 替代
huangdijia Nov 13, 2025
ae5fd97
fix: 添加对 ProducerMessage 的 setPoolName 方法的模拟,确保正确处理自定义池名称
huangdijia Nov 13, 2025
af2c45a
更新 Functions.php
huangdijia Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/helpers/src/Functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ function di(?string $abstract = null, array $parameters = [])
}

/**
* @deprecated use FriendsOfHyperf\Support\dispatch() instead
* @param AsyncTaskInterface|Closure|JobInterface|ProduceMessage|ProducerMessageInterface|object $job
* @return bool
*/
Expand Down
24 changes: 24 additions & 0 deletions src/support/src/Functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,33 @@

use Closure;
use Exception;
use FriendsOfHyperf\AsyncQueueClosureJob\CallQueuedClosure;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\AsyncQueue\JobInterface;
use longlang\phpkafka\Producer\ProduceMessage;

use function Hyperf\Support\value;

/**
* Do not assign a value to the return value of this function unless you are very clear about the consequences of doing so.
* @param Closure|JobInterface|ProduceMessage|ProducerMessageInterface|mixed $job
* @return ($job is Closure ? PendingAsyncQueueDispatch : ($job is JobInterface ? PendingAsyncQueueDispatch : ($job is ProducerMessageInterface ? PendingAmqpProducerMessageDispatch : PendingKafkaProducerMessageDispatch)))
* @throws \InvalidArgumentException
*/
function dispatch($job)
{
if ($job instanceof Closure) {
$job = CallQueuedClosure::create($job);
}

return match (true) {
interface_exists(ProducerMessageInterface::class) && $job instanceof ProducerMessageInterface => new PendingAmqpProducerMessageDispatch($job),
class_exists(ProduceMessage::class) && $job instanceof ProduceMessage => new PendingKafkaProducerMessageDispatch($job),
interface_exists(JobInterface::class) && $job instanceof JobInterface => new PendingAsyncQueueDispatch($job),
default => throw new \InvalidArgumentException('Unsupported job type.')
};
}

/**
* Retry an operation a given number of times.
* @template TReturn
Expand Down
76 changes: 76 additions & 0 deletions src/support/src/PendingAmqpProducerMessageDispatch.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

declare(strict_types=1);
/**
* This file is part of friendsofhyperf/components.
*
* @link https://github.com/friendsofhyperf/components
* @document https://github.com/friendsofhyperf/components/blob/main/README.md
* @contact huangdijia@gmail.com
*/

namespace FriendsOfHyperf\Support;

use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Amqp\Producer;
use Hyperf\Conditionable\Conditionable;
use Hyperf\Context\ApplicationContext;

/**
* @property array{application_headers?:AMQPTable} $properties
*/
class PendingAmqpProducerMessageDispatch
{
use Conditionable;

public ?string $pool = null;

public int $timeout = 5;

public bool $confirm = false;

public function __construct(protected ProducerMessageInterface $message)
{
}

public function __destruct()
{
$this->pool && $this->message->setPoolName($this->pool);
ApplicationContext::getContainer()
->get(Producer::class)
->produce($this->message, $this->confirm, $this->timeout);
}

public function onPool(string $pool): static
{
$this->pool = $pool;
return $this;
}
Comment thread
huangdijia marked this conversation as resolved.

public function setPayload(mixed $data): static
{
$this->message->setPayload($data);
return $this;
}

public function withHeader(string $key, mixed $value, ?int $ttl = null): static
{
(function () use ($key, $value, $ttl) {
$this->properties['application_headers'] ??= new \PhpAmqpLib\Wire\AMQPTable(); // @phpstan-ignore-line
$this->properties['application_headers']->set($key, $value, $ttl);
})->call($this->message);
return $this;
}

public function setConfirm(bool $confirm): static
{
$this->confirm = $confirm;
return $this;
}

public function setTimeout(int $timeout): static
{
$this->timeout = $timeout;
return $this;
}
}
56 changes: 56 additions & 0 deletions src/support/src/PendingAsyncQueueDispatch.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);
/**
* This file is part of friendsofhyperf/components.
*
* @link https://github.com/friendsofhyperf/components
* @document https://github.com/friendsofhyperf/components/blob/main/README.md
* @contact huangdijia@gmail.com
*/

namespace FriendsOfHyperf\Support;

use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\JobInterface;
use Hyperf\Conditionable\Conditionable;
use Hyperf\Context\ApplicationContext;

class PendingAsyncQueueDispatch
{
use Conditionable;

public string $pool = 'default';

public int $delay = 0;

public function __construct(protected JobInterface $job)
{
}

public function __destruct()
{
ApplicationContext::getContainer()
->get(DriverFactory::class)
->get($this->pool)
->push($this->job, $this->delay);
}

public function setMaxAttempts(int $maxAttempts): static
{
$this->job->setMaxAttempts($maxAttempts);
return $this;
}

public function onPool(string $pool): static
{
$this->pool = $pool;
return $this;
}

public function delay(int $delay): static
{
$this->delay = $delay;
return $this;
}
}
67 changes: 67 additions & 0 deletions src/support/src/PendingKafkaProducerMessageDispatch.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

declare(strict_types=1);
/**
* This file is part of friendsofhyperf/components.
*
* @link https://github.com/friendsofhyperf/components
* @document https://github.com/friendsofhyperf/components/blob/main/README.md
* @contact huangdijia@gmail.com
*/

namespace FriendsOfHyperf\Support;

use Hyperf\Conditionable\Conditionable;
use Hyperf\Context\ApplicationContext;
use Hyperf\Kafka\ProducerManager;
use longlang\phpkafka\Producer\ProduceMessage;
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;

/**
* @property array $headers
* @property null|string $key
* @property null|string $value
*/
class PendingKafkaProducerMessageDispatch
{
use Conditionable;

public string $pool = 'default';

public function __construct(protected ProduceMessage $message)
{
}

public function __destruct()
{
ApplicationContext::getContainer()
->get(ProducerManager::class)
->getProducer($this->pool)
->sendBatch([$this->message]);
}

public function onPool(string $pool): static
{
$this->pool = $pool;
return $this;
}

public function setKey(string $key): static
{
(fn () => $this->key = $key)->call($this->message);
return $this;
}

public function setValue(string $value): static
{
(fn () => $this->value = $value)->call($this->message);
return $this;
}

public function withHeader(string $key, string $value): static
{
$header = (new RecordHeader())->setHeaderKey($key)->setValue($value);
(fn () => $this->headers[] = $header)->call($this->message);
return $this;
}
}
Loading
Loading