From d14e78d23a3e4d167049f5912e57d1a0909f6657 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Sun, 5 Apr 2026 16:51:02 +0300 Subject: [PATCH 01/11] Implement grpc namind and load-balancing rfc --- composer.json | 1 + src/Client/Resolver/InvalidTarget.php | 18 ++++ src/Client/Resolver/Scheme.php | 15 +++ src/Client/Resolver/Target.php | 130 +++++++++++++++++++++++ tests/Client/Resolver/TargetTest.php | 143 ++++++++++++++++++++++++++ 5 files changed, 307 insertions(+) create mode 100644 src/Client/Resolver/InvalidTarget.php create mode 100644 src/Client/Resolver/Scheme.php create mode 100644 src/Client/Resolver/Target.php create mode 100644 tests/Client/Resolver/TargetTest.php diff --git a/composer.json b/composer.json index b1fdb29..3434865 100644 --- a/composer.json +++ b/composer.json @@ -19,6 +19,7 @@ ], "require": { "php": "^8.4", + "ext-ctype": "*", "amphp/amp": "^3.1", "amphp/byte-stream": "^2.1", "amphp/http": "^2.1", diff --git a/src/Client/Resolver/InvalidTarget.php b/src/Client/Resolver/InvalidTarget.php new file mode 100644 index 0000000..b1fafd3 --- /dev/null +++ b/src/Client/Resolver/InvalidTarget.php @@ -0,0 +1,18 @@ +value}:"; + + if (str_starts_with($target, $prefix)) { + $addr = substr($target, \strlen($prefix)); + if ($addr === '') { + throw new InvalidTarget($target); + } + + return match ($scheme) { + Scheme::Dns => self::parseDns($addr, $target), + Scheme::Ipv4, Scheme::Ipv6 => self::parseAddressList($scheme, $addr, $target), + }; + } + } + + return self::parseHostPort($target); + } + + /** + * @internal use {@see Target::parse()} instead + * + * @param non-empty-list $addresses + * @param ?non-empty-string $authority DNS server address (only for dns://authority/host form) + */ + public function __construct( + public Scheme $scheme, + public array $addresses, + public ?string $authority = null, + ) {} + + /** + * @param non-empty-string $addr + * @param non-empty-string $target + * @throws InvalidTarget + */ + private static function parseDns(string $addr, string $target): self + { + $authority = null; + + if (str_starts_with($addr, '//')) { + $addr = substr($addr, 2); + $slash = strpos($addr, '/'); + if ($slash === false) { + throw new InvalidTarget($target); + } + + $auth = substr($addr, 0, $slash); + if ($auth !== '') { + $authority = $auth; + } + + $addr = substr($addr, $slash + 1); + if ($addr === '') { + throw new InvalidTarget($target); + } + } + + self::validateDnsEndpoint($addr, $target); // @phpstan-ignore staticMethod.alreadyNarrowedType + + return new self(Scheme::Dns, [$addr], $authority); + } + + /** + * @param non-empty-string $addr + * @throws InvalidTarget + */ + private static function parseAddressList(Scheme $scheme, string $addr, string $original): self + { + $addresses = explode(',', $addr); + $result = []; + + foreach ($addresses as $address) { + $address = trim($address); + + if ($address === '' || str_contains($address, ' ')) { + throw new InvalidTarget($original); + } + + $result[] = $address; + } + + return new self($scheme, $result); + } + + /** + * @throws InvalidTarget + */ + private static function parseHostPort(string $target): self + { + $colon = strpos($target, ':'); + + if ($colon !== false && !ctype_digit(substr($target, $colon + 1))) { + throw new InvalidTarget($target); + } + + self::validateDnsEndpoint($target); + + return new self(Scheme::Dns, [$target]); + } + + /** + * @phpstan-assert non-empty-string $addr + * @throws InvalidTarget + */ + private static function validateDnsEndpoint(string $addr, ?string $target = null): void + { + if ($addr === '' || str_contains($addr, '/') || str_contains($addr, ' ')) { + throw new InvalidTarget($target ?? $addr); + } + } +} diff --git a/tests/Client/Resolver/TargetTest.php b/tests/Client/Resolver/TargetTest.php new file mode 100644 index 0000000..a9f750d --- /dev/null +++ b/tests/Client/Resolver/TargetTest.php @@ -0,0 +1,143 @@ + + */ + public static function provideParseTargetCases(): iterable + { + yield 'dns:host' => [ + 'dns:myhost', + new Target(Scheme::Dns, ['myhost']), + ]; + + yield 'dns:host:port' => [ + 'dns:myhost:50051', + new Target(Scheme::Dns, ['myhost:50051']), + ]; + + yield 'dns:///host' => [ + 'dns:///myhost', + new Target(Scheme::Dns, ['myhost']), + ]; + + yield 'dns:///host:port' => [ + 'dns:///myhost:50051', + new Target(Scheme::Dns, ['myhost:50051']), + ]; + + yield 'dns://authority/host' => [ + 'dns://authority/myhost', + new Target(Scheme::Dns, ['myhost'], 'authority'), + ]; + + yield 'dns://authority:port/host:port' => [ + 'dns://authority:53/myhost:50051', + new Target(Scheme::Dns, ['myhost:50051'], 'authority:53'), + ]; + + yield 'ipv4:single address' => [ + 'ipv4:192.168.0.1:50051', + new Target(Scheme::Ipv4, ['192.168.0.1:50051']), + ]; + + yield 'ipv4:multiple addresses' => [ + 'ipv4:192.168.0.1:50051,192.168.0.2:50052', + new Target(Scheme::Ipv4, ['192.168.0.1:50051', '192.168.0.2:50052']), + ]; + + yield 'ipv4:multiple addresses with spaces around comma' => [ + 'ipv4:192.168.0.1:50051, 192.168.0.2:50052', + new Target(Scheme::Ipv4, ['192.168.0.1:50051', '192.168.0.2:50052']), + ]; + + yield 'ipv4:address without port' => [ + 'ipv4:10.0.0.1', + new Target(Scheme::Ipv4, ['10.0.0.1']), + ]; + + yield 'ipv6:single address with port' => [ + 'ipv6:[::1]:50051', + new Target(Scheme::Ipv6, ['[::1]:50051']), + ]; + + yield 'ipv6:multiple addresses' => [ + 'ipv6:[::1]:50051,[::2]:50052', + new Target(Scheme::Ipv6, ['[::1]:50051', '[::2]:50052']), + ]; + + yield 'ipv6:address without port' => [ + 'ipv6:::1', + new Target(Scheme::Ipv6, ['::1']), + ]; + + yield 'bare host' => [ + 'myhost', + new Target(Scheme::Dns, ['myhost']), + ]; + + yield 'bare host:port' => [ + 'myhost:50051', + new Target(Scheme::Dns, ['myhost:50051']), + ]; + + yield 'bare localhost:port' => [ + 'localhost:50051', + new Target(Scheme::Dns, ['localhost:50051']), + ]; + } + + /** + * @param non-empty-string $input + */ + #[DataProvider('provideParseTargetThrowsCases')] + public function testParseTargetThrows(string $input): void + { + $this->expectException(InvalidTarget::class); + Target::parse($input); + } + + /** + * @return iterable + */ + public static function provideParseTargetThrowsCases(): iterable + { + yield 'empty string' => ['']; + yield 'uppercase dns scheme is invalid' => ['DNS:myhost']; + yield 'uppercase ipv4 scheme is invalid' => ['IPV4:192.168.0.1:50051']; + yield 'uppercase ipv6 scheme is invalid' => ['IPV6:[::1]:50051']; + yield 'ipv4: no address' => ['ipv4:']; + yield 'ipv6: no address' => ['ipv6:']; + yield 'dns:/// empty host' => ['dns:///']; + yield 'unknown scheme' => ['etcd:myhost']; + yield 'ipv4: trailing comma' => ['ipv4:192.168.0.1,']; + yield 'ipv4: leading comma' => ['ipv4:,192.168.0.1']; + yield 'http scheme' => ['http://localhost:50051']; + yield 'https scheme' => ['https://example.com:443']; + yield 'dns://host without slash' => ['dns://myhost']; + yield 'dns:///host/extra' => ['dns:///myhost/extra']; + yield 'dns: endpoint with spaces' => ['dns:my host']; + yield 'bare host with spaces' => ['my host:50051']; + yield 'bare bracketed ipv6 is invalid' => ['[::1]:50051']; + } +} From f553f147b3058e8d0f5d350e93d95ae54be16d89 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Mon, 6 Apr 2026 08:48:00 +0300 Subject: [PATCH 02/11] chore: support unix targets --- src/Client/Resolver/Scheme.php | 1 + src/Client/Resolver/Target.php | 20 +++++++++++++++++++- tests/Client/Resolver/TargetTest.php | 18 ++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/Client/Resolver/Scheme.php b/src/Client/Resolver/Scheme.php index b0cde11..3d1a718 100644 --- a/src/Client/Resolver/Scheme.php +++ b/src/Client/Resolver/Scheme.php @@ -12,4 +12,5 @@ enum Scheme: string case Dns = 'dns'; case Ipv4 = 'ipv4'; case Ipv6 = 'ipv6'; + case Unix = 'unix'; } diff --git a/src/Client/Resolver/Target.php b/src/Client/Resolver/Target.php index c689bd3..14d0dde 100644 --- a/src/Client/Resolver/Target.php +++ b/src/Client/Resolver/Target.php @@ -28,6 +28,7 @@ public static function parse(string $target): self return match ($scheme) { Scheme::Dns => self::parseDns($addr, $target), Scheme::Ipv4, Scheme::Ipv6 => self::parseAddressList($scheme, $addr, $target), + Scheme::Unix => self::parseUnix($addr, $target), }; } } @@ -37,7 +38,6 @@ public static function parse(string $target): self /** * @internal use {@see Target::parse()} instead - * * @param non-empty-list $addresses * @param ?non-empty-string $authority DNS server address (only for dns://authority/host form) */ @@ -117,6 +117,24 @@ private static function parseHostPort(string $target): self return new self(Scheme::Dns, [$target]); } + /** + * @param non-empty-string $addr + * @param non-empty-string $target + * @throws InvalidTarget + */ + private static function parseUnix(string $addr, string $target): self + { + if (str_starts_with($addr, '//')) { + $addr = substr($addr, 2); + } + + if ($addr === '' || $addr[0] !== '/') { + throw new InvalidTarget($target); + } + + return new self(Scheme::Unix, [$addr]); + } + /** * @phpstan-assert non-empty-string $addr * @throws InvalidTarget diff --git a/tests/Client/Resolver/TargetTest.php b/tests/Client/Resolver/TargetTest.php index a9f750d..f3cf7ba 100644 --- a/tests/Client/Resolver/TargetTest.php +++ b/tests/Client/Resolver/TargetTest.php @@ -105,6 +105,21 @@ public static function provideParseTargetCases(): iterable 'localhost:50051', new Target(Scheme::Dns, ['localhost:50051']), ]; + + yield 'unix:///path' => [ + 'unix:///var/run/grpc.sock', + new Target(Scheme::Unix, ['/var/run/grpc.sock']), + ]; + + yield 'unix:/path' => [ + 'unix:/var/run/grpc.sock', + new Target(Scheme::Unix, ['/var/run/grpc.sock']), + ]; + + yield 'unix:///tmp/test.sock' => [ + 'unix:///tmp/test.sock', + new Target(Scheme::Unix, ['/tmp/test.sock']), + ]; } /** @@ -139,5 +154,8 @@ public static function provideParseTargetThrowsCases(): iterable yield 'dns: endpoint with spaces' => ['dns:my host']; yield 'bare host with spaces' => ['my host:50051']; yield 'bare bracketed ipv6 is invalid' => ['[::1]:50051']; + yield 'unix: no path' => ['unix:']; + yield 'unix: relative path' => ['unix:relative/path.sock']; + yield 'unix:// without absolute path' => ['unix://relative']; } } From c6ef5a20321c8860ec07e767983e1adc975609ed Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Mon, 6 Apr 2026 11:42:55 +0300 Subject: [PATCH 03/11] chore: implement simple load balancer mechanism --- src/Client/Address.php | 27 ++++ src/Client/Endpoint.php | 20 +++ src/Client/LoadBalancer.php | 19 +++ src/Client/LoadBalancer/PickFirst.php | 54 ++++++++ src/Client/LoadBalancer/PickFirstFactory.php | 31 +++++ src/Client/LoadBalancer/RoundRobin.php | 41 ++++++ src/Client/LoadBalancer/RoundRobinFactory.php | 26 ++++ src/Client/LoadBalancerFactory.php | 21 ++++ tests/Client/LoadBalancer/PickFirstTest.php | 118 ++++++++++++++++++ tests/Client/LoadBalancer/RoundRobinTest.php | 105 ++++++++++++++++ 10 files changed, 462 insertions(+) create mode 100644 src/Client/Address.php create mode 100644 src/Client/Endpoint.php create mode 100644 src/Client/LoadBalancer.php create mode 100644 src/Client/LoadBalancer/PickFirst.php create mode 100644 src/Client/LoadBalancer/PickFirstFactory.php create mode 100644 src/Client/LoadBalancer/RoundRobin.php create mode 100644 src/Client/LoadBalancer/RoundRobinFactory.php create mode 100644 src/Client/LoadBalancerFactory.php create mode 100644 tests/Client/LoadBalancer/PickFirstTest.php create mode 100644 tests/Client/LoadBalancer/RoundRobinTest.php diff --git a/src/Client/Address.php b/src/Client/Address.php new file mode 100644 index 0000000..6f06dee --- /dev/null +++ b/src/Client/Address.php @@ -0,0 +1,27 @@ +value; + } +} diff --git a/src/Client/Endpoint.php b/src/Client/Endpoint.php new file mode 100644 index 0000000..1075517 --- /dev/null +++ b/src/Client/Endpoint.php @@ -0,0 +1,20 @@ +address->value === $other->address->value; + } +} diff --git a/src/Client/LoadBalancer.php b/src/Client/LoadBalancer.php new file mode 100644 index 0000000..e852c4a --- /dev/null +++ b/src/Client/LoadBalancer.php @@ -0,0 +1,19 @@ + $endpoints + */ + public function refresh(array $endpoints): void; + + public function pick(): Endpoint; +} diff --git a/src/Client/LoadBalancer/PickFirst.php b/src/Client/LoadBalancer/PickFirst.php new file mode 100644 index 0000000..7a1c0f6 --- /dev/null +++ b/src/Client/LoadBalancer/PickFirst.php @@ -0,0 +1,54 @@ + $endpoints + */ + public function __construct( + array $endpoints, + private readonly Randomizer $randomizer, + ) { + $this->current = $this->doPick($endpoints); + } + + #[\Override] + public function refresh(array $endpoints): void + { + $this->current = $this->doPick($endpoints, $this->current); + } + + #[\Override] + public function pick(): Endpoint + { + return $this->current; + } + + /** + * @param non-empty-list $endpoints + */ + private function doPick(array $endpoints, ?Endpoint $current = null): Endpoint + { + /** @var non-empty-list $endpoints */ + $endpoints = $this->randomizer->shuffleArray($endpoints); + + if ($current === null || !array_any($endpoints, $current->equals(...))) { + $current = $endpoints[0]; + } + + return $current; + } +} diff --git a/src/Client/LoadBalancer/PickFirstFactory.php b/src/Client/LoadBalancer/PickFirstFactory.php new file mode 100644 index 0000000..12bedc0 --- /dev/null +++ b/src/Client/LoadBalancer/PickFirstFactory.php @@ -0,0 +1,31 @@ +randomizer); + } +} diff --git a/src/Client/LoadBalancer/RoundRobin.php b/src/Client/LoadBalancer/RoundRobin.php new file mode 100644 index 0000000..cc5418e --- /dev/null +++ b/src/Client/LoadBalancer/RoundRobin.php @@ -0,0 +1,41 @@ + $endpoints + */ + public function __construct( + private array $endpoints, + ) { + $this->count = \count($endpoints); + } + + #[\Override] + public function refresh(array $endpoints): void + { + $this->endpoints = $endpoints; + $this->count = \count($endpoints); + } + + #[\Override] + public function pick(): Endpoint + { + return $this->endpoints[$this->cursor++ % $this->count]; // @phpstan-ignore offsetAccess.notFound + } +} diff --git a/src/Client/LoadBalancer/RoundRobinFactory.php b/src/Client/LoadBalancer/RoundRobinFactory.php new file mode 100644 index 0000000..a87a6cd --- /dev/null +++ b/src/Client/LoadBalancer/RoundRobinFactory.php @@ -0,0 +1,26 @@ + $endpoints + */ + public function create(array $endpoints): LoadBalancer; +} diff --git a/tests/Client/LoadBalancer/PickFirstTest.php b/tests/Client/LoadBalancer/PickFirstTest.php new file mode 100644 index 0000000..dca3a21 --- /dev/null +++ b/tests/Client/LoadBalancer/PickFirstTest.php @@ -0,0 +1,118 @@ + $endpoints + */ + #[DataProvider('providePickAlwaysReturnsSameEndpointCases')] + public function testPickAlwaysReturnsSameEndpoint(array $endpoints): void + { + $balancer = new PickFirstFactory()->create($endpoints); + $first = $balancer->pick(); + + for ($i = 0; $i < 10; ++$i) { + self::assertTrue($first->equals($balancer->pick())); + } + } + + /** + * @return iterable}> + */ + public static function providePickAlwaysReturnsSameEndpointCases(): iterable + { + yield 'single endpoint' => [ + [new Endpoint(new Address('10.0.0.1:50051'))], + ]; + + yield 'multiple endpoints' => [ + [ + new Endpoint(new Address('10.0.0.1:50051')), + new Endpoint(new Address('10.0.0.2:50051')), + new Endpoint(new Address('10.0.0.3:50051')), + ], + ]; + } + + /** + * @param non-empty-list $initial + * @param non-empty-list $refreshed + */ + #[DataProvider('provideRefreshKeepsPinnedEndpointCases')] + public function testRefreshKeepsPinnedEndpoint(array $initial, array $refreshed): void + { + $balancer = new PickFirstFactory()->create($initial); + $pinned = $balancer->pick(); + + $balancer->refresh($refreshed); + + self::assertTrue($pinned->equals($balancer->pick())); + } + + /** + * @return iterable, non-empty-list}> + */ + public static function provideRefreshKeepsPinnedEndpointCases(): iterable + { + $a = new Endpoint(new Address('10.0.0.1:50051')); + $b = new Endpoint(new Address('10.0.0.2:50051')); + $c = new Endpoint(new Address('10.0.0.3:50051')); + + yield 'same list' => [ + [$a, $b], [$a, $b], + ]; + + yield 'new endpoint added' => [ + [$a, $b], [$a, $b, $c], + ]; + + yield 'order changed but pinned still present' => [ + [$a, $b, $c], [$c, $b, $a], + ]; + } + + /** + * @param non-empty-list $initial + * @param non-empty-list $refreshed + */ + #[DataProvider('provideRefreshSwitchesPinnedEndpointCases')] + public function testRefreshSwitchesPinnedEndpoint(array $initial, array $refreshed): void + { + $balancer = new PickFirstFactory()->create($initial); + $pinned = $balancer->pick(); + + $balancer->refresh($refreshed); + + self::assertFalse($pinned->equals($balancer->pick())); + } + + /** + * @return iterable, non-empty-list}> + */ + public static function provideRefreshSwitchesPinnedEndpointCases(): iterable + { + $a = new Endpoint(new Address('10.0.0.1:50051')); + $b = new Endpoint(new Address('10.0.0.2:50051')); + $c = new Endpoint(new Address('10.0.0.3:50051')); + + yield 'pinned removed, new endpoints' => [ + [$a], [$b, $c], + ]; + + yield 'completely different list' => [ + [$a, $b], [$c], + ]; + } +} diff --git a/tests/Client/LoadBalancer/RoundRobinTest.php b/tests/Client/LoadBalancer/RoundRobinTest.php new file mode 100644 index 0000000..2fd48ef --- /dev/null +++ b/tests/Client/LoadBalancer/RoundRobinTest.php @@ -0,0 +1,105 @@ + $endpoints + * @param list $expectedPicks + */ + #[DataProvider('providePickCyclesCases')] + public function testPickCycles(array $endpoints, array $expectedPicks): void + { + $balancer = new RoundRobinFactory()->create($endpoints); + + foreach ($expectedPicks as $expected) { + self::assertTrue($expected->equals($balancer->pick())); + } + } + + /** + * @return iterable, list}> + */ + public static function providePickCyclesCases(): iterable + { + $a = new Endpoint(new Address('10.0.0.1:50051')); + $b = new Endpoint(new Address('10.0.0.2:50051')); + $c = new Endpoint(new Address('10.0.0.3:50051')); + + yield 'single endpoint' => [ + [$a], + [$a, $a, $a], + ]; + + yield 'two endpoints' => [ + [$a, $b], + [$a, $b, $a, $b], + ]; + + yield 'three endpoints, full cycle twice' => [ + [$a, $b, $c], + [$a, $b, $c, $a, $b, $c], + ]; + } + + /** + * @param non-empty-list $initial + * @param non-empty-list $refreshed + * @param list $expectedAfterRefresh + */ + #[DataProvider('provideRefreshCases')] + public function testRefresh(array $initial, int $picksBeforeRefresh, array $refreshed, array $expectedAfterRefresh): void + { + $balancer = new RoundRobinFactory()->create($initial); + + for ($i = 0; $i < $picksBeforeRefresh; ++$i) { + $balancer->pick(); + } + + $balancer->refresh($refreshed); + + foreach ($expectedAfterRefresh as $expected) { + self::assertTrue($expected->equals($balancer->pick())); + } + } + + /** + * @return iterable, int, non-empty-list, list}> + */ + public static function provideRefreshCases(): iterable + { + $a = new Endpoint(new Address('10.0.0.1:50051')); + $b = new Endpoint(new Address('10.0.0.2:50051')); + $c = new Endpoint(new Address('10.0.0.3:50051')); + + yield 'same list, continues rotation' => [ + [$a, $b, $c], 1, + [$a, $b, $c], + [$b, $c, $a], + ]; + + yield 'completely new list' => [ + [$a, $b], 0, + [$c], + [$c, $c, $c], + ]; + + yield 'list grew' => [ + [$a], 0, + [$a, $b, $c], + [$a, $b, $c], + ]; + } +} From 257051b13442a08e89b050ca12b4b278a0caeb92 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Mon, 6 Apr 2026 14:47:03 +0300 Subject: [PATCH 04/11] chore: fix cs --- src/Client/{Resolver => }/InvalidTarget.php | 2 +- src/Client/{Resolver => }/Scheme.php | 2 +- src/Client/{Resolver => }/Target.php | 2 +- tests/Client/LoadBalancer/RoundRobinTest.php | 1 - tests/Client/{Resolver => }/TargetTest.php | 2 +- 5 files changed, 4 insertions(+), 5 deletions(-) rename src/Client/{Resolver => }/InvalidTarget.php (87%) rename src/Client/{Resolver => }/Scheme.php (81%) rename src/Client/{Resolver => }/Target.php (99%) rename tests/Client/{Resolver => }/TargetTest.php (99%) diff --git a/src/Client/Resolver/InvalidTarget.php b/src/Client/InvalidTarget.php similarity index 87% rename from src/Client/Resolver/InvalidTarget.php rename to src/Client/InvalidTarget.php index b1fafd3..5a27e20 100644 --- a/src/Client/Resolver/InvalidTarget.php +++ b/src/Client/InvalidTarget.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Thesis\Grpc\Client\Resolver; +namespace Thesis\Grpc\Client; use Thesis\Grpc\GrpcException; diff --git a/src/Client/Resolver/Scheme.php b/src/Client/Scheme.php similarity index 81% rename from src/Client/Resolver/Scheme.php rename to src/Client/Scheme.php index 3d1a718..f8327b1 100644 --- a/src/Client/Resolver/Scheme.php +++ b/src/Client/Scheme.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Thesis\Grpc\Client\Resolver; +namespace Thesis\Grpc\Client; /** * @api diff --git a/src/Client/Resolver/Target.php b/src/Client/Target.php similarity index 99% rename from src/Client/Resolver/Target.php rename to src/Client/Target.php index 14d0dde..d0374e4 100644 --- a/src/Client/Resolver/Target.php +++ b/src/Client/Target.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Thesis\Grpc\Client\Resolver; +namespace Thesis\Grpc\Client; /** * @api diff --git a/tests/Client/LoadBalancer/RoundRobinTest.php b/tests/Client/LoadBalancer/RoundRobinTest.php index 2fd48ef..22ec2dd 100644 --- a/tests/Client/LoadBalancer/RoundRobinTest.php +++ b/tests/Client/LoadBalancer/RoundRobinTest.php @@ -6,7 +6,6 @@ use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\Attributes\DataProvider; -use PHPUnit\Framework\Attributes\TestWith; use PHPUnit\Framework\TestCase; use Thesis\Grpc\Client\Address; use Thesis\Grpc\Client\Endpoint; diff --git a/tests/Client/Resolver/TargetTest.php b/tests/Client/TargetTest.php similarity index 99% rename from tests/Client/Resolver/TargetTest.php rename to tests/Client/TargetTest.php index f3cf7ba..cf163b2 100644 --- a/tests/Client/Resolver/TargetTest.php +++ b/tests/Client/TargetTest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Thesis\Grpc\Client\Resolver; +namespace Thesis\Grpc\Client; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\Attributes\DataProvider; From 2d0dad8dc43ff04d912426e6bf773aae99fe4b0d Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Mon, 6 Apr 2026 15:12:36 +0300 Subject: [PATCH 05/11] chore: add PickContext to allow implement more complex lb like hashing lb --- src/Client/LoadBalancer.php | 2 +- src/Client/LoadBalancer/PickFirst.php | 3 ++- src/Client/LoadBalancer/RoundRobin.php | 3 ++- src/Client/PickContext.php | 21 ++++++++++++++++++++ tests/Client/LoadBalancer/PickFirstTest.php | 19 ++++++++++++------ tests/Client/LoadBalancer/RoundRobinTest.php | 13 +++++++++--- 6 files changed, 49 insertions(+), 12 deletions(-) create mode 100644 src/Client/PickContext.php diff --git a/src/Client/LoadBalancer.php b/src/Client/LoadBalancer.php index e852c4a..c71eb69 100644 --- a/src/Client/LoadBalancer.php +++ b/src/Client/LoadBalancer.php @@ -15,5 +15,5 @@ interface LoadBalancer */ public function refresh(array $endpoints): void; - public function pick(): Endpoint; + public function pick(PickContext $context): Endpoint; } diff --git a/src/Client/LoadBalancer/PickFirst.php b/src/Client/LoadBalancer/PickFirst.php index 7a1c0f6..7ff7591 100644 --- a/src/Client/LoadBalancer/PickFirst.php +++ b/src/Client/LoadBalancer/PickFirst.php @@ -7,6 +7,7 @@ use Random\Randomizer; use Thesis\Grpc\Client\Endpoint; use Thesis\Grpc\Client\LoadBalancer; +use Thesis\Grpc\Client\PickContext; /** * @api @@ -32,7 +33,7 @@ public function refresh(array $endpoints): void } #[\Override] - public function pick(): Endpoint + public function pick(PickContext $context): Endpoint { return $this->current; } diff --git a/src/Client/LoadBalancer/RoundRobin.php b/src/Client/LoadBalancer/RoundRobin.php index cc5418e..c08dea0 100644 --- a/src/Client/LoadBalancer/RoundRobin.php +++ b/src/Client/LoadBalancer/RoundRobin.php @@ -6,6 +6,7 @@ use Thesis\Grpc\Client\Endpoint; use Thesis\Grpc\Client\LoadBalancer; +use Thesis\Grpc\Client\PickContext; /** * @api @@ -34,7 +35,7 @@ public function refresh(array $endpoints): void } #[\Override] - public function pick(): Endpoint + public function pick(PickContext $context): Endpoint { return $this->endpoints[$this->cursor++ % $this->count]; // @phpstan-ignore offsetAccess.notFound } diff --git a/src/Client/PickContext.php b/src/Client/PickContext.php new file mode 100644 index 0000000..e6666d9 --- /dev/null +++ b/src/Client/PickContext.php @@ -0,0 +1,21 @@ +create($endpoints); - $first = $balancer->pick(); + $first = $balancer->pick(self::context()); for ($i = 0; $i < 10; ++$i) { - self::assertTrue($first->equals($balancer->pick())); + self::assertTrue($first->equals($balancer->pick(self::context()))); } } @@ -54,11 +56,11 @@ public static function providePickAlwaysReturnsSameEndpointCases(): iterable public function testRefreshKeepsPinnedEndpoint(array $initial, array $refreshed): void { $balancer = new PickFirstFactory()->create($initial); - $pinned = $balancer->pick(); + $pinned = $balancer->pick(self::context()); $balancer->refresh($refreshed); - self::assertTrue($pinned->equals($balancer->pick())); + self::assertTrue($pinned->equals($balancer->pick(self::context()))); } /** @@ -91,11 +93,11 @@ public static function provideRefreshKeepsPinnedEndpointCases(): iterable public function testRefreshSwitchesPinnedEndpoint(array $initial, array $refreshed): void { $balancer = new PickFirstFactory()->create($initial); - $pinned = $balancer->pick(); + $pinned = $balancer->pick(self::context()); $balancer->refresh($refreshed); - self::assertFalse($pinned->equals($balancer->pick())); + self::assertFalse($pinned->equals($balancer->pick(self::context()))); } /** @@ -115,4 +117,9 @@ public static function provideRefreshSwitchesPinnedEndpointCases(): iterable [$a, $b], [$c], ]; } + + private static function context(): PickContext + { + return new PickContext('/test.Service/Method', new Metadata()); + } } diff --git a/tests/Client/LoadBalancer/RoundRobinTest.php b/tests/Client/LoadBalancer/RoundRobinTest.php index 22ec2dd..1c0fd4d 100644 --- a/tests/Client/LoadBalancer/RoundRobinTest.php +++ b/tests/Client/LoadBalancer/RoundRobinTest.php @@ -9,6 +9,8 @@ use PHPUnit\Framework\TestCase; use Thesis\Grpc\Client\Address; use Thesis\Grpc\Client\Endpoint; +use Thesis\Grpc\Client\PickContext; +use Thesis\Grpc\Metadata; #[CoversClass(RoundRobin::class)] #[CoversClass(RoundRobinFactory::class)] @@ -24,7 +26,7 @@ public function testPickCycles(array $endpoints, array $expectedPicks): void $balancer = new RoundRobinFactory()->create($endpoints); foreach ($expectedPicks as $expected) { - self::assertTrue($expected->equals($balancer->pick())); + self::assertTrue($expected->equals($balancer->pick(self::context()))); } } @@ -64,13 +66,13 @@ public function testRefresh(array $initial, int $picksBeforeRefresh, array $refr $balancer = new RoundRobinFactory()->create($initial); for ($i = 0; $i < $picksBeforeRefresh; ++$i) { - $balancer->pick(); + $balancer->pick(self::context()); } $balancer->refresh($refreshed); foreach ($expectedAfterRefresh as $expected) { - self::assertTrue($expected->equals($balancer->pick())); + self::assertTrue($expected->equals($balancer->pick(self::context()))); } } @@ -101,4 +103,9 @@ public static function provideRefreshCases(): iterable [$a, $b, $c], ]; } + + private static function context(): PickContext + { + return new PickContext('/test.Service/Method', new Metadata()); + } } From c1f30d306357caefd360ac13653320dbde9bdaf1 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Mon, 6 Apr 2026 19:27:48 +0300 Subject: [PATCH 06/11] chore: change string addresses to TargetAddress to hold port --- composer.json | 2 +- src/Client/Scheme.php | 1 + src/Client/Target.php | 73 ++++++++++----------- src/Client/TargetAddress.php | 35 +++++++++++ src/Internal/Protocol/Frame.php | 2 +- tests/Client/TargetTest.php | 108 +++++++++++++++++--------------- 6 files changed, 133 insertions(+), 88 deletions(-) create mode 100644 src/Client/TargetAddress.php diff --git a/composer.json b/composer.json index 3434865..026145c 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,6 @@ ], "require": { "php": "^8.4", - "ext-ctype": "*", "amphp/amp": "^3.1", "amphp/byte-stream": "^2.1", "amphp/http": "^2.1", @@ -28,6 +27,7 @@ "amphp/pipeline": "^1.2", "amphp/socket": "^2.3", "amphp/sync": "^2.3", + "league/uri": "^7.8", "psr/log": "^3.0", "revolt/event-loop": "1.0.8", "thesis/endian": "^0.3.0", diff --git a/src/Client/Scheme.php b/src/Client/Scheme.php index f8327b1..e0a6026 100644 --- a/src/Client/Scheme.php +++ b/src/Client/Scheme.php @@ -13,4 +13,5 @@ enum Scheme: string case Ipv4 = 'ipv4'; case Ipv6 = 'ipv6'; case Unix = 'unix'; + case Passthrough = 'passthrough'; } diff --git a/src/Client/Target.php b/src/Client/Target.php index d0374e4..893a432 100644 --- a/src/Client/Target.php +++ b/src/Client/Target.php @@ -4,6 +4,8 @@ namespace Thesis\Grpc\Client; +use League\Uri\Uri; + /** * @api * @see https://github.com/grpc/grpc/blob/master/doc/naming.md @@ -26,8 +28,8 @@ public static function parse(string $target): self } return match ($scheme) { - Scheme::Dns => self::parseDns($addr, $target), - Scheme::Ipv4, Scheme::Ipv6 => self::parseAddressList($scheme, $addr, $target), + Scheme::Dns, Scheme::Passthrough => self::parseDns($addr, $target, $scheme), + Scheme::Ipv4, Scheme::Ipv6 => new self($scheme, self::parseAddresses($addr, $target)), Scheme::Unix => self::parseUnix($addr, $target), }; } @@ -38,7 +40,7 @@ public static function parse(string $target): self /** * @internal use {@see Target::parse()} instead - * @param non-empty-list $addresses + * @param non-empty-list $addresses * @param ?non-empty-string $authority DNS server address (only for dns://authority/host form) */ public function __construct( @@ -52,7 +54,7 @@ public function __construct( * @param non-empty-string $target * @throws InvalidTarget */ - private static function parseDns(string $addr, string $target): self + private static function parseDns(string $addr, string $target, Scheme $scheme = Scheme::Dns): self { $authority = null; @@ -74,47 +76,34 @@ private static function parseDns(string $addr, string $target): self } } - self::validateDnsEndpoint($addr, $target); // @phpstan-ignore staticMethod.alreadyNarrowedType - - return new self(Scheme::Dns, [$addr], $authority); + return new self( + $scheme, + self::parseAddresses($addr, $target), + $authority, + ); } /** * @param non-empty-string $addr + * @param non-empty-string $target + * @return non-empty-list * @throws InvalidTarget */ - private static function parseAddressList(Scheme $scheme, string $addr, string $original): self + private static function parseAddresses(string $addr, string $target): array { - $addresses = explode(',', $addr); - $result = []; - - foreach ($addresses as $address) { - $address = trim($address); - - if ($address === '' || str_contains($address, ' ')) { - throw new InvalidTarget($original); - } - - $result[] = $address; - } - - return new self($scheme, $result); + return array_map( + static fn(string $address) => self::parseAddress(trim($address), $target), + explode(',', $addr), + ); } /** + * @param non-empty-string $target * @throws InvalidTarget */ private static function parseHostPort(string $target): self { - $colon = strpos($target, ':'); - - if ($colon !== false && !ctype_digit(substr($target, $colon + 1))) { - throw new InvalidTarget($target); - } - - self::validateDnsEndpoint($target); - - return new self(Scheme::Dns, [$target]); + return new self(Scheme::Dns, self::parseAddresses($target, $target)); } /** @@ -132,17 +121,29 @@ private static function parseUnix(string $addr, string $target): self throw new InvalidTarget($target); } - return new self(Scheme::Unix, [$addr]); + return new self(Scheme::Unix, [new TargetAddress($addr, 0)]); } /** - * @phpstan-assert non-empty-string $addr + * @param non-empty-string $target * @throws InvalidTarget */ - private static function validateDnsEndpoint(string $addr, ?string $target = null): void + private static function parseAddress(string $addr, string $target): TargetAddress { - if ($addr === '' || str_contains($addr, '/') || str_contains($addr, ' ')) { - throw new InvalidTarget($target ?? $addr); + $uri = Uri::parse("tcp://{$addr}") ?? throw new InvalidTarget($target); + + $host = $uri->getHost() ?? ''; + + if ($host === '') { + throw new InvalidTarget($target); + } + + $port = $uri->getPort() ?? 0; + + if ($port < 1 || $port > 65_535 || $uri->getPath() !== '') { + throw new InvalidTarget($target); } + + return new TargetAddress($host, $port); } } diff --git a/src/Client/TargetAddress.php b/src/Client/TargetAddress.php new file mode 100644 index 0000000..e932e89 --- /dev/null +++ b/src/Client/TargetAddress.php @@ -0,0 +1,35 @@ + $port + */ + public function __construct( + public string $host, + public int $port, + ) {} + + /** + * @return non-empty-string + */ + #[\Override] + public function __toString(): string + { + $host = $this->host; + + if ($this->port > 0) { + $host = "{$host}:{$this->port}"; + } + + return $host; + } +} diff --git a/src/Internal/Protocol/Frame.php b/src/Internal/Protocol/Frame.php index 58d5995..b33e758 100644 --- a/src/Internal/Protocol/Frame.php +++ b/src/Internal/Protocol/Frame.php @@ -18,7 +18,7 @@ public function __construct( ) {} } -const byteOrder = Endian\Order::big; +const byteOrder = Endian\Order::Big; /** * @internal diff --git a/tests/Client/TargetTest.php b/tests/Client/TargetTest.php index cf163b2..5bacf49 100644 --- a/tests/Client/TargetTest.php +++ b/tests/Client/TargetTest.php @@ -26,99 +26,98 @@ public function testParseTarget(string $input, Target $expected): void */ public static function provideParseTargetCases(): iterable { - yield 'dns:host' => [ - 'dns:myhost', - new Target(Scheme::Dns, ['myhost']), - ]; - yield 'dns:host:port' => [ 'dns:myhost:50051', - new Target(Scheme::Dns, ['myhost:50051']), - ]; - - yield 'dns:///host' => [ - 'dns:///myhost', - new Target(Scheme::Dns, ['myhost']), + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]), ]; yield 'dns:///host:port' => [ 'dns:///myhost:50051', - new Target(Scheme::Dns, ['myhost:50051']), + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]), ]; - yield 'dns://authority/host' => [ - 'dns://authority/myhost', - new Target(Scheme::Dns, ['myhost'], 'authority'), + yield 'dns://authority/host:port' => [ + 'dns://authority:53/myhost:50051', + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'authority:53'), ]; - yield 'dns://authority:port/host:port' => [ - 'dns://authority:53/myhost:50051', - new Target(Scheme::Dns, ['myhost:50051'], 'authority:53'), + yield 'dns://authority/host:port without authority port' => [ + 'dns://authority/myhost:50051', + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'authority'), ]; yield 'ipv4:single address' => [ 'ipv4:192.168.0.1:50051', - new Target(Scheme::Ipv4, ['192.168.0.1:50051']), + new Target(Scheme::Ipv4, [new TargetAddress('192.168.0.1', 50_051)]), ]; yield 'ipv4:multiple addresses' => [ 'ipv4:192.168.0.1:50051,192.168.0.2:50052', - new Target(Scheme::Ipv4, ['192.168.0.1:50051', '192.168.0.2:50052']), + new Target(Scheme::Ipv4, [ + new TargetAddress('192.168.0.1', 50_051), + new TargetAddress('192.168.0.2', 50_052), + ]), ]; yield 'ipv4:multiple addresses with spaces around comma' => [ 'ipv4:192.168.0.1:50051, 192.168.0.2:50052', - new Target(Scheme::Ipv4, ['192.168.0.1:50051', '192.168.0.2:50052']), - ]; - - yield 'ipv4:address without port' => [ - 'ipv4:10.0.0.1', - new Target(Scheme::Ipv4, ['10.0.0.1']), + new Target(Scheme::Ipv4, [ + new TargetAddress('192.168.0.1', 50_051), + new TargetAddress('192.168.0.2', 50_052), + ]), ]; yield 'ipv6:single address with port' => [ 'ipv6:[::1]:50051', - new Target(Scheme::Ipv6, ['[::1]:50051']), + new Target(Scheme::Ipv6, [new TargetAddress('[::1]', 50_051)]), ]; yield 'ipv6:multiple addresses' => [ 'ipv6:[::1]:50051,[::2]:50052', - new Target(Scheme::Ipv6, ['[::1]:50051', '[::2]:50052']), - ]; - - yield 'ipv6:address without port' => [ - 'ipv6:::1', - new Target(Scheme::Ipv6, ['::1']), - ]; - - yield 'bare host' => [ - 'myhost', - new Target(Scheme::Dns, ['myhost']), + new Target(Scheme::Ipv6, [ + new TargetAddress('[::1]', 50_051), + new TargetAddress('[::2]', 50_052), + ]), ]; yield 'bare host:port' => [ 'myhost:50051', - new Target(Scheme::Dns, ['myhost:50051']), + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]), ]; yield 'bare localhost:port' => [ 'localhost:50051', - new Target(Scheme::Dns, ['localhost:50051']), + new Target(Scheme::Dns, [new TargetAddress('localhost', 50_051)]), ]; yield 'unix:///path' => [ 'unix:///var/run/grpc.sock', - new Target(Scheme::Unix, ['/var/run/grpc.sock']), + new Target(Scheme::Unix, [new TargetAddress('/var/run/grpc.sock', 0)]), ]; yield 'unix:/path' => [ 'unix:/var/run/grpc.sock', - new Target(Scheme::Unix, ['/var/run/grpc.sock']), + new Target(Scheme::Unix, [new TargetAddress('/var/run/grpc.sock', 0)]), ]; yield 'unix:///tmp/test.sock' => [ 'unix:///tmp/test.sock', - new Target(Scheme::Unix, ['/tmp/test.sock']), + new Target(Scheme::Unix, [new TargetAddress('/tmp/test.sock', 0)]), + ]; + + yield 'passthrough:///host:port' => [ + 'passthrough:///myhost:50051', + new Target(Scheme::Passthrough, [new TargetAddress('myhost', 50_051)]), + ]; + + yield 'passthrough:host:port' => [ + 'passthrough:myhost:50051', + new Target(Scheme::Passthrough, [new TargetAddress('myhost', 50_051)]), + ]; + + yield 'bare bracketed ipv6' => [ + '[::1]:50051', + new Target(Scheme::Dns, [new TargetAddress('[::1]', 50_051)]), ]; } @@ -138,24 +137,33 @@ public function testParseTargetThrows(string $input): void public static function provideParseTargetThrowsCases(): iterable { yield 'empty string' => ['']; - yield 'uppercase dns scheme is invalid' => ['DNS:myhost']; + yield 'uppercase dns scheme is invalid' => ['DNS:myhost:50051']; yield 'uppercase ipv4 scheme is invalid' => ['IPV4:192.168.0.1:50051']; yield 'uppercase ipv6 scheme is invalid' => ['IPV6:[::1]:50051']; yield 'ipv4: no address' => ['ipv4:']; yield 'ipv6: no address' => ['ipv6:']; yield 'dns:/// empty host' => ['dns:///']; - yield 'unknown scheme' => ['etcd:myhost']; - yield 'ipv4: trailing comma' => ['ipv4:192.168.0.1,']; - yield 'ipv4: leading comma' => ['ipv4:,192.168.0.1']; + yield 'unknown scheme' => ['etcd:myhost:50051']; + yield 'ipv4: trailing comma' => ['ipv4:192.168.0.1:50051,']; + yield 'ipv4: leading comma' => ['ipv4:,192.168.0.1:50051']; yield 'http scheme' => ['http://localhost:50051']; yield 'https scheme' => ['https://example.com:443']; yield 'dns://host without slash' => ['dns://myhost']; - yield 'dns:///host/extra' => ['dns:///myhost/extra']; - yield 'dns: endpoint with spaces' => ['dns:my host']; + yield 'dns:///host/extra' => ['dns:///myhost:50051/extra']; + yield 'dns: endpoint with spaces' => ['dns:my host:50051']; yield 'bare host with spaces' => ['my host:50051']; - yield 'bare bracketed ipv6 is invalid' => ['[::1]:50051']; yield 'unix: no path' => ['unix:']; yield 'unix: relative path' => ['unix:relative/path.sock']; yield 'unix:// without absolute path' => ['unix://relative']; + yield 'passthrough:/// empty host' => ['passthrough:///']; + yield 'passthrough: no address' => ['passthrough:']; + yield 'dns: host without port' => ['dns:myhost']; + yield 'dns:///host without port' => ['dns:///myhost']; + yield 'ipv4: address without port' => ['ipv4:10.0.0.1']; + yield 'ipv6: address without port' => ['ipv6:::1']; + yield 'ipv6: bracketed without port' => ['ipv6:[::1]']; + yield 'bare host without port' => ['myhost']; + yield 'passthrough: host without port' => ['passthrough:myhost']; + yield 'port out of range' => ['dns:myhost:99999']; } } From ba70bad0b69da870c712b5a868b55a788d2f8edc Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Mon, 6 Apr 2026 19:39:43 +0300 Subject: [PATCH 07/11] chore: fix cs and ci --- composer.json | 2 +- src/Client/Target.php | 47 +++++++++++++++++-------------------------- 2 files changed, 20 insertions(+), 29 deletions(-) diff --git a/composer.json b/composer.json index 026145c..f3fb86d 100644 --- a/composer.json +++ b/composer.json @@ -30,7 +30,7 @@ "league/uri": "^7.8", "psr/log": "^3.0", "revolt/event-loop": "1.0.8", - "thesis/endian": "^0.3.0", + "thesis/endian": ">= 0.3.2", "thesis/googleapis-rpc-types": "^0.1.6", "thesis/package-version": "^0.1.2", "thesis/protobuf": "^0.1.8", diff --git a/src/Client/Target.php b/src/Client/Target.php index 893a432..f201466 100644 --- a/src/Client/Target.php +++ b/src/Client/Target.php @@ -30,12 +30,12 @@ public static function parse(string $target): self return match ($scheme) { Scheme::Dns, Scheme::Passthrough => self::parseDns($addr, $target, $scheme), Scheme::Ipv4, Scheme::Ipv6 => new self($scheme, self::parseAddresses($addr, $target)), - Scheme::Unix => self::parseUnix($addr, $target), + Scheme::Unix => new self($scheme, [self::parseUnix($addr, $target)]), }; } } - return self::parseHostPort($target); + return new self(Scheme::Dns, self::parseAddresses($target)); } /** @@ -54,7 +54,7 @@ public function __construct( * @param non-empty-string $target * @throws InvalidTarget */ - private static function parseDns(string $addr, string $target, Scheme $scheme = Scheme::Dns): self + private static function parseDns(string $addr, string $target, Scheme $scheme): self { $authority = null; @@ -83,35 +83,12 @@ private static function parseDns(string $addr, string $target, Scheme $scheme = ); } - /** - * @param non-empty-string $addr - * @param non-empty-string $target - * @return non-empty-list - * @throws InvalidTarget - */ - private static function parseAddresses(string $addr, string $target): array - { - return array_map( - static fn(string $address) => self::parseAddress(trim($address), $target), - explode(',', $addr), - ); - } - - /** - * @param non-empty-string $target - * @throws InvalidTarget - */ - private static function parseHostPort(string $target): self - { - return new self(Scheme::Dns, self::parseAddresses($target, $target)); - } - /** * @param non-empty-string $addr * @param non-empty-string $target * @throws InvalidTarget */ - private static function parseUnix(string $addr, string $target): self + private static function parseUnix(string $addr, string $target): TargetAddress { if (str_starts_with($addr, '//')) { $addr = substr($addr, 2); @@ -121,7 +98,21 @@ private static function parseUnix(string $addr, string $target): self throw new InvalidTarget($target); } - return new self(Scheme::Unix, [new TargetAddress($addr, 0)]); + return new TargetAddress($addr, 0); + } + + /** + * @param non-empty-string $addr + * @param ?non-empty-string $target + * @return non-empty-list + * @throws InvalidTarget + */ + private static function parseAddresses(string $addr, ?string $target = null): array + { + return array_map( + static fn(string $address) => self::parseAddress(trim($address), $target ?? $addr), + explode(',', $addr), + ); } /** From cc9f287a05ef7b378847d6293714c5876ed96d39 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Tue, 7 Apr 2026 09:23:50 +0300 Subject: [PATCH 08/11] chore: urldecode in Target --- src/Client/Target.php | 1 + tests/Client/TargetTest.php | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/src/Client/Target.php b/src/Client/Target.php index f201466..56c0acb 100644 --- a/src/Client/Target.php +++ b/src/Client/Target.php @@ -121,6 +121,7 @@ private static function parseAddresses(string $addr, ?string $target = null): ar */ private static function parseAddress(string $addr, string $target): TargetAddress { + $addr = urldecode($addr); $uri = Uri::parse("tcp://{$addr}") ?? throw new InvalidTarget($target); $host = $uri->getHost() ?? ''; diff --git a/tests/Client/TargetTest.php b/tests/Client/TargetTest.php index 5bacf49..b02aa1b 100644 --- a/tests/Client/TargetTest.php +++ b/tests/Client/TargetTest.php @@ -46,6 +46,16 @@ public static function provideParseTargetCases(): iterable new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)], 'authority'), ]; + yield 'dns:///ipv6 with brackets' => [ + 'dns:///[2001:db8:85a3:8d3:1319:8a2e:370:7348]:443', + new Target(Scheme::Dns, [new TargetAddress('[2001:db8:85a3:8d3:1319:8a2e:370:7348]', 443)]), + ]; + + yield 'dns:///ipv6 percent-encoded brackets' => [ + 'dns:///%5B2001:db8:85a3:8d3:1319:8a2e:370:7348%5D:443', + new Target(Scheme::Dns, [new TargetAddress('[2001:db8:85a3:8d3:1319:8a2e:370:7348]', 443)]), + ]; + yield 'ipv4:single address' => [ 'ipv4:192.168.0.1:50051', new Target(Scheme::Ipv4, [new TargetAddress('192.168.0.1', 50_051)]), From 6286af123d400bf4758e5bc24b92e5d0465e3d51 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Tue, 7 Apr 2026 15:05:35 +0300 Subject: [PATCH 09/11] chore: implement resolvers --- src/Client/EndpointResolver.php | 19 +++++++++++++++++++ src/Client/EndpointResolverListener.php | 13 +++++++++++++ src/Client/Resolution.php | 18 ++++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 src/Client/EndpointResolver.php create mode 100644 src/Client/EndpointResolverListener.php create mode 100644 src/Client/Resolution.php diff --git a/src/Client/EndpointResolver.php b/src/Client/EndpointResolver.php new file mode 100644 index 0000000..9d3d77b --- /dev/null +++ b/src/Client/EndpointResolver.php @@ -0,0 +1,19 @@ + $endpoints + */ + public function __construct( + public array $endpoints, + ) {} +} From da6119db541cfa9294033332417d4b500177d199 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Tue, 7 Apr 2026 15:08:44 +0300 Subject: [PATCH 10/11] chore: add StaticResolver --- .../EndpointResolver/StaticResolver.php | 31 +++++++++ .../EndpointResolver/StaticResolverTest.php | 69 +++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 src/Client/EndpointResolver/StaticResolver.php create mode 100644 tests/Client/EndpointResolver/StaticResolverTest.php diff --git a/src/Client/EndpointResolver/StaticResolver.php b/src/Client/EndpointResolver/StaticResolver.php new file mode 100644 index 0000000..9a94be6 --- /dev/null +++ b/src/Client/EndpointResolver/StaticResolver.php @@ -0,0 +1,31 @@ + new Endpoint(new Address((string) $address)), + $target->addresses, + ), + ); + } +} diff --git a/tests/Client/EndpointResolver/StaticResolverTest.php b/tests/Client/EndpointResolver/StaticResolverTest.php new file mode 100644 index 0000000..a08dfa5 --- /dev/null +++ b/tests/Client/EndpointResolver/StaticResolverTest.php @@ -0,0 +1,69 @@ + $endpoints + */ + #[DataProvider('provideResolveCases')] + public function testResolve(Target $target, array $endpoints): void + { + $resolver = new StaticResolver(); + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::never()) + ->method('onResolve'); + + $resolution = $resolver->resolve($target, $listener, new NullCancellation()); + + self::assertEquals($endpoints, $resolution->endpoints); + } + + /** + * @return iterable}> + */ + public static function provideResolveCases(): iterable + { + yield 'ipv4: single address' => [ + new Target(Scheme::Ipv4, [new TargetAddress('192.168.0.1', 50_051)]), + [new Endpoint(new Address('192.168.0.1:50051'))], + ]; + + yield 'ipv4: multiple addresses' => [ + new Target(Scheme::Ipv4, [ + new TargetAddress('192.168.0.1', 50_051), + new TargetAddress('192.168.0.2', 50_052), + ]), + [ + new Endpoint(new Address('192.168.0.1:50051')), + new Endpoint(new Address('192.168.0.2:50052')), + ], + ]; + + yield 'ipv6: single address' => [ + new Target(Scheme::Ipv6, [new TargetAddress('[::1]', 50_051)]), + [new Endpoint(new Address('[::1]:50051'))], + ]; + + yield 'unix: socket path' => [ + new Target(Scheme::Unix, [new TargetAddress('/var/run/grpc.sock', 0)]), + [new Endpoint(new Address('/var/run/grpc.sock'))], + ]; + } +} From f5971cf4ebebd188993bfd860a80fe1fac85598b Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Tue, 7 Apr 2026 15:09:25 +0300 Subject: [PATCH 11/11] chore: add DnsResolver --- Makefile | 1 + composer.json | 2 + src/Client/EndpointResolver/DnsResolver.php | 169 ++++++++++++++++ .../EndpointResolver/DnsResolverTest.php | 182 ++++++++++++++++++ 4 files changed, 354 insertions(+) create mode 100644 src/Client/EndpointResolver/DnsResolver.php create mode 100644 tests/Client/EndpointResolver/DnsResolverTest.php diff --git a/Makefile b/Makefile index a0102f3..488aee1 100644 --- a/Makefile +++ b/Makefile @@ -135,6 +135,7 @@ check: fixer-check rector-check composer-validate composer-normalize-check deps- compile-test-stub: docker run --rm \ + --pull always \ --user $(CONTAINER_USER) \ -v $(PWD):/workspace \ -w /workspace \ diff --git a/composer.json b/composer.json index f3fb86d..0b199bb 100644 --- a/composer.json +++ b/composer.json @@ -21,6 +21,8 @@ "php": "^8.4", "amphp/amp": "^3.1", "amphp/byte-stream": "^2.1", + "amphp/cache": "^2.0", + "amphp/dns": "^2.4", "amphp/http": "^2.1", "amphp/http-client": "^5.3", "amphp/http-server": "^3.4", diff --git a/src/Client/EndpointResolver/DnsResolver.php b/src/Client/EndpointResolver/DnsResolver.php new file mode 100644 index 0000000..dcb0616 --- /dev/null +++ b/src/Client/EndpointResolver/DnsResolver.php @@ -0,0 +1,169 @@ + $cache + */ + public function __construct( + private ?AmphpDnsResolver $dns = null, + private ?Cache $cache = null, + private float $minResolveInterval = self::MIN_RESOLVE_INTERVAL, + private float $maxResolveInterval = self::MAX_RESOLVE_INTERVAL, + ) {} + + #[\Override] + public function resolve( + Target $target, + EndpointResolverListener $listener, + Cancellation $cancellation, + ): Resolution { + $resolver = $this->dns ?? $this->configureResolver($target); + + $result = $this->resolveNow( + $resolver, + $target, + $cancellation, + ); + + $resolveTTL = $this->computeResolveTTL($result); + + if ($resolveTTL > 0) { + EventLoop::queue( + $this->resolveLater(...), + $resolver, + $listener, + $target, + $resolveTTL, + $cancellation, + ); + } + + return $result->resolution; + } + + private function resolveNow( + AmphpDnsResolver $resolver, + Target $target, + Cancellation $cancellation, + ): ResolveResult { + $endpoints = []; + $ttl = null; + + foreach ($target->addresses as $address) { + $records = $resolver->resolve($address->host, cancellation: $cancellation); + + foreach ($records as $record) { + $ip = $record->getValue(); + + if ($record->getType() === DnsRecord::AAAA) { + $ip = "[{$ip}]"; + } + + $endpoints[] = new Endpoint( + new Address("{$ip}:{$address->port}"), + ); + + if ($record->getTtl() !== null) { + $ttl = min($record->getTtl(), $ttl ?? \PHP_INT_MAX); + } + } + } + + return new ResolveResult( + new Resolution($endpoints), + $ttl, + ); + } + + private function resolveLater( + AmphpDnsResolver $resolver, + EndpointResolverListener $listener, + Target $target, + float $ttl, + Cancellation $cancellation, + ): void { + while (true) { + $suspension = EventLoop::getSuspension(); + $timerId = EventLoop::delay($ttl, $suspension->resume(...)); + $cancellationId = $cancellation->subscribe($suspension->throw(...)); + + try { + $suspension->suspend(); + + $result = $this->resolveNow($resolver, $target, $cancellation); + $listener->onResolve($result->resolution); + + $ttl = $this->computeResolveTTL($result); + + if ($ttl <= 0) { + return; + } + } catch (CancelledException) { + return; + } catch (\Throwable $e) { + $listener->onResolve($e); + } finally { + EventLoop::cancel($timerId); + $cancellation->unsubscribe($cancellationId); + } + } + } + + private function configureResolver(Target $target): AmphpDnsResolver + { + $configLoader = null; + if ($target->authority !== null) { + $configLoader = new StaticDnsConfigLoader( + new DnsConfig([$target->authority]), + ); + } + + return new Rfc1035StubDnsResolver( + $this->cache, + $configLoader, + ); + } + + private function computeResolveTTL(ResolveResult $result): float + { + return max( + $this->minResolveInterval, + min($this->maxResolveInterval, $result->ttl ?? $this->minResolveInterval), + ); + } +} + +final readonly class ResolveResult +{ + public function __construct( + public Resolution $resolution, + public ?float $ttl = null, + ) {} +} diff --git a/tests/Client/EndpointResolver/DnsResolverTest.php b/tests/Client/EndpointResolver/DnsResolverTest.php new file mode 100644 index 0000000..c7ff4cb --- /dev/null +++ b/tests/Client/EndpointResolver/DnsResolverTest.php @@ -0,0 +1,182 @@ + $records + * @param non-empty-list $endpoints + */ + #[DataProvider('provideResolveCases')] + public function testResolve(Target $target, array $records, array $endpoints): void + { + $deferredCancellation = new DeferredCancellation(); + + $dnsResolver = $this->createMock(AmphpDnsResolver::class); + $dnsResolver + ->expects(self::once()) + ->method('resolve') + ->willReturn($records); + + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::never()) + ->method('onResolve'); + + $resolver = new DnsResolver($dnsResolver); + $resolution = $resolver->resolve($target, $listener, $deferredCancellation->getCancellation()); + $deferredCancellation->cancel(); + + self::assertEquals($endpoints, $resolution->endpoints); + } + + /** + * @return iterable, non-empty-list}> + */ + public static function provideResolveCases(): iterable + { + yield 'single A record' => [ + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]), + [new DnsRecord('192.168.0.1', DnsRecord::A, 300)], + [new Endpoint(new Address('192.168.0.1:50051'))], + ]; + + yield 'multiple A records' => [ + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]), + [ + new DnsRecord('192.168.0.1', DnsRecord::A, 300), + new DnsRecord('192.168.0.2', DnsRecord::A, 300), + ], + [ + new Endpoint(new Address('192.168.0.1:50051')), + new Endpoint(new Address('192.168.0.2:50051')), + ], + ]; + + yield 'AAAA record wraps in brackets' => [ + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]), + [new DnsRecord('::1', DnsRecord::AAAA, 300)], + [new Endpoint(new Address('[::1]:50051'))], + ]; + + yield 'mixed A and AAAA records' => [ + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]), + [ + new DnsRecord('192.168.0.1', DnsRecord::A, 300), + new DnsRecord('::1', DnsRecord::AAAA, 300), + ], + [ + new Endpoint(new Address('192.168.0.1:50051')), + new Endpoint(new Address('[::1]:50051')), + ], + ]; + + yield 'passthrough uses same dns logic' => [ + new Target(Scheme::Passthrough, [new TargetAddress('myhost', 443)]), + [new DnsRecord('10.0.0.1', DnsRecord::A, 300)], + [new Endpoint(new Address('10.0.0.1:443'))], + ]; + } + + public function testResolveListener(): void + { + $deferredCancellation = new DeferredCancellation(); + + $dnsResolver = $this->createMock(AmphpDnsResolver::class); + $dnsResolver + ->expects(self::atLeastOnce()) + ->method('resolve') + ->willReturn( + [new DnsRecord('192.168.0.1', DnsRecord::A, 1)], + ); + + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::atLeastOnce()) + ->method('onResolve') + ->with( + self::isInstanceOf(Resolution::class), + ); + + $resolver = new DnsResolver($dnsResolver, minResolveInterval: 0.1, maxResolveInterval: 0.1); + $resolver->resolve( + new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]), + $listener, + $deferredCancellation->getCancellation(), + ); + + delay(0.25); + $deferredCancellation->cancel(); + } + + public function testResolveStopOnCancellation(): void + { + $target = new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]); + $deferredCancellation = new DeferredCancellation(); + + $dnsResolver = self::createStub(AmphpDnsResolver::class); + $dnsResolver->method('resolve')->willReturn( + [new DnsRecord('192.168.0.1', DnsRecord::A, 300)], + ); + + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::never()) + ->method('onResolve'); + + $resolver = new DnsResolver($dnsResolver, minResolveInterval: 0.5, maxResolveInterval: 0.5); + $resolver->resolve($target, $listener, $deferredCancellation->getCancellation()); + + $deferredCancellation->cancel(); + delay(0.7); + } + + public function testResolveThrows(): void + { + $target = new Target(Scheme::Dns, [new TargetAddress('myhost', 50_051)]); + $deferredCancellation = new DeferredCancellation(); + + $dnsResolver = $this->createMock(AmphpDnsResolver::class); + $dnsResolver + ->expects(self::atLeastOnce()) + ->method('resolve') + ->willReturnOnConsecutiveCalls( + [new DnsRecord('192.168.0.1', DnsRecord::A, 1)], + self::throwException(new DnsException('DNS failed')), + ); + + $listener = $this->createMock(EndpointResolverListener::class); + $listener + ->expects(self::atLeastOnce()) + ->method('onResolve') + ->with( + self::isInstanceOf(\Throwable::class), + ); + + $resolver = new DnsResolver($dnsResolver, minResolveInterval: 0.1, maxResolveInterval: 0.1); + $resolver->resolve($target, $listener, $deferredCancellation->getCancellation()); + + delay(0.25); + $deferredCancellation->cancel(); + } +}