Skip to content
Open
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ check: fixer-check rector-check composer-validate composer-normalize-check deps-

compile-test-stub:
docker run --rm \
--pull always \
--user $(CONTAINER_USER) \
-v $(PWD):/workspace \
-w /workspace \
Expand Down
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@
"php": "^8.4",
"amphp/amp": "^3.1",
"amphp/byte-stream": "^2.1",
"amphp/cache": "^2.0",
"amphp/dns": "^2.4",
"amphp/http": "^2.1",
"amphp/http-client": "^5.3",
"amphp/http-server": "^3.4",
"amphp/pipeline": "^1.2",
"amphp/socket": "^2.3",
"amphp/sync": "^2.3",
"league/uri": "^7.8",
"psr/log": "^3.0",
"revolt/event-loop": "1.0.8",
"thesis/endian": "^0.3.0",
"thesis/endian": ">= 0.3.2",
"thesis/googleapis-rpc-types": "^0.1.6",
"thesis/package-version": "^0.1.2",
"thesis/protobuf": "^0.1.8",
Expand Down
27 changes: 27 additions & 0 deletions src/Client/Address.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Client;

/**
* @api
*/
final readonly class Address implements \Stringable
{
/**
* @param non-empty-string $value
*/
public function __construct(
public string $value,
) {}

/**
* @return non-empty-string
*/
#[\Override]
public function __toString(): string
{
return $this->value;
}
}
20 changes: 20 additions & 0 deletions src/Client/Endpoint.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Client;

/**
* @api
*/
final readonly class Endpoint
{
public function __construct(
public Address $address,
) {}

public function equals(self $other): bool
{
return $this->address->value === $other->address->value;
}
}
19 changes: 19 additions & 0 deletions src/Client/EndpointResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Client;

use Amp\Cancellation;

/**
* @api
*/
interface EndpointResolver
{
public function resolve(
Target $target,
EndpointResolverListener $listener,
Cancellation $cancellation,
): Resolution;
}
169 changes: 169 additions & 0 deletions src/Client/EndpointResolver/DnsResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Client\EndpointResolver;

use Amp\Cache\Cache;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\Dns\DnsConfig;
use Amp\Dns\DnsRecord;
use Amp\Dns\DnsResolver as AmphpDnsResolver;
use Amp\Dns\Rfc1035StubDnsResolver;
use Amp\Dns\StaticDnsConfigLoader;
use Revolt\EventLoop;
use Thesis\Grpc\Client\Address;
use Thesis\Grpc\Client\Endpoint;
use Thesis\Grpc\Client\EndpointResolver;
use Thesis\Grpc\Client\EndpointResolverListener;
use Thesis\Grpc\Client\Resolution;
use Thesis\Grpc\Client\Target;

/**
* @api
*/
final readonly class DnsResolver implements EndpointResolver
{
private const float MIN_RESOLVE_INTERVAL = 30;
private const float MAX_RESOLVE_INTERVAL = 300;

/**
* @param ?Cache<mixed> $cache
*/
public function __construct(
private ?AmphpDnsResolver $dns = null,
private ?Cache $cache = null,
private float $minResolveInterval = self::MIN_RESOLVE_INTERVAL,
private float $maxResolveInterval = self::MAX_RESOLVE_INTERVAL,
) {}

#[\Override]
public function resolve(
Target $target,
EndpointResolverListener $listener,
Cancellation $cancellation,
): Resolution {
$resolver = $this->dns ?? $this->configureResolver($target);

$result = $this->resolveNow(
$resolver,
$target,
$cancellation,
);

$resolveTTL = $this->computeResolveTTL($result);

if ($resolveTTL > 0) {
EventLoop::queue(
$this->resolveLater(...),
$resolver,
$listener,
$target,
$resolveTTL,
$cancellation,
);
}

return $result->resolution;
}

private function resolveNow(
AmphpDnsResolver $resolver,
Target $target,
Cancellation $cancellation,
): ResolveResult {
$endpoints = [];
$ttl = null;

foreach ($target->addresses as $address) {
$records = $resolver->resolve($address->host, cancellation: $cancellation);

foreach ($records as $record) {
$ip = $record->getValue();

if ($record->getType() === DnsRecord::AAAA) {
$ip = "[{$ip}]";
}

$endpoints[] = new Endpoint(
new Address("{$ip}:{$address->port}"),
);

if ($record->getTtl() !== null) {
$ttl = min($record->getTtl(), $ttl ?? \PHP_INT_MAX);
}
}
}

return new ResolveResult(
new Resolution($endpoints),
$ttl,
);
}

private function resolveLater(
AmphpDnsResolver $resolver,
EndpointResolverListener $listener,
Target $target,
float $ttl,
Cancellation $cancellation,
): void {
while (true) {
$suspension = EventLoop::getSuspension();
$timerId = EventLoop::delay($ttl, $suspension->resume(...));
$cancellationId = $cancellation->subscribe($suspension->throw(...));

try {
$suspension->suspend();

$result = $this->resolveNow($resolver, $target, $cancellation);
$listener->onResolve($result->resolution);

$ttl = $this->computeResolveTTL($result);

if ($ttl <= 0) {
return;
}
} catch (CancelledException) {
return;
} catch (\Throwable $e) {
$listener->onResolve($e);
} finally {
EventLoop::cancel($timerId);
$cancellation->unsubscribe($cancellationId);
}
}
}

private function configureResolver(Target $target): AmphpDnsResolver
{
$configLoader = null;
if ($target->authority !== null) {
$configLoader = new StaticDnsConfigLoader(
new DnsConfig([$target->authority]),
);
}

return new Rfc1035StubDnsResolver(
$this->cache,
$configLoader,
);
}

private function computeResolveTTL(ResolveResult $result): float
{
return max(
$this->minResolveInterval,
min($this->maxResolveInterval, $result->ttl ?? $this->minResolveInterval),
);
}
}

final readonly class ResolveResult
{
public function __construct(
public Resolution $resolution,
public ?float $ttl = null,
) {}
}
31 changes: 31 additions & 0 deletions src/Client/EndpointResolver/StaticResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Client\EndpointResolver;

use Amp\Cancellation;
use Thesis\Grpc\Client\Address;
use Thesis\Grpc\Client\Endpoint;
use Thesis\Grpc\Client\EndpointResolver;
use Thesis\Grpc\Client\EndpointResolverListener;
use Thesis\Grpc\Client\Resolution;
use Thesis\Grpc\Client\Target;
use Thesis\Grpc\Client\TargetAddress;

/**
* @api
*/
final readonly class StaticResolver implements EndpointResolver
{
#[\Override]
public function resolve(Target $target, EndpointResolverListener $listener, Cancellation $cancellation): Resolution
{
return new Resolution(
array_map(
static fn(TargetAddress $address) => new Endpoint(new Address((string) $address)),
$target->addresses,
),
);
}
}
13 changes: 13 additions & 0 deletions src/Client/EndpointResolverListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Client;

/**
* @api
*/
interface EndpointResolverListener
{
public function onResolve(Resolution|\Throwable $result): void;
}
18 changes: 18 additions & 0 deletions src/Client/InvalidTarget.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Client;

use Thesis\Grpc\GrpcException;

/**
* @api
*/
final class InvalidTarget extends GrpcException
{
public function __construct(string $target)
{
parent::__construct(\sprintf('Invalid gRPC target: "%s".', $target));
}
}
19 changes: 19 additions & 0 deletions src/Client/LoadBalancer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Client;

/**
* @api
* @see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
*/
interface LoadBalancer
{
/**
* @param non-empty-list<Endpoint> $endpoints
*/
public function refresh(array $endpoints): void;

public function pick(PickContext $context): Endpoint;
}
Loading