Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions src/Internal/Http2/StreamCodec.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,32 +88,24 @@ public function decode(
/** @var Pipeline\Queue<T> $out */
$out = new Pipeline\Queue();

$encoder = $this->encoder;
$compressor = $this->compressor;
$parser = new Protocol\Parser(
$out->push(...),
$type,
$this->encoder,
$this->compressor,
);

EventLoop::queue(static function () use (
$encoder,
$compressor,
$parser,
$in,
$out,
$type,
$cancellation,
): void {
try {
while (($message = $in->read($cancellation)) !== null) {
\assert($message !== '', 'gRPC frame must not be empty.');

$frame = Protocol\decodeFrame($message);

$buffer = $frame->buffer;

if ($frame->compressed && $buffer !== '') {
$buffer = $compressor->decompress($buffer);
}

$out->push($encoder->decode($buffer, $type));
while (($chunk = $in->read($cancellation)) !== null) {
$parser->push($chunk);
}
} catch (Pipeline\DisposedException|CancelledException) {
} catch (Pipeline\DisposedException|CancelledException) { // @phpstan-ignore catch.neverThrown, catch.neverThrown
} catch (\Throwable $e) {
$out->error($e);
} finally {
Expand Down
61 changes: 61 additions & 0 deletions src/Internal/Protocol/Parser.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Internal\Protocol;

use Thesis\Grpc\Compression;
use Thesis\Grpc\Encoding;

/**
* @internal
* @template T of object
*/
final class Parser
{
private string $buffer = '';

/**
* @param \Closure(T): void $push
* @param class-string<T> $type
*/
public function __construct(
private readonly \Closure $push,
private readonly string $type,
private readonly Encoding\Encoder $encoder,
private readonly Compression\Compressor $compressor,
) {}

/**
* @throws Compression\DecompressionFailed
* @throws Encoding\DecodingFailed
*/
public function push(string $data): void
{
$this->buffer .= $data;

while (\strlen($this->buffer) >= bodyOffset) {
$messageLength = byteOrder->unpackUint32(
/** @phpstan-ignore argument.type */
substr($this->buffer, lengthOffset, 4),
);

$frameSize = bodyOffset + $messageLength;

if (\strlen($this->buffer) < $frameSize) {
break;
}

$frame = decodeFrame(substr($this->buffer, 0, $frameSize));
$this->buffer = substr($this->buffer, $frameSize);

$buffer = $frame->buffer;

if ($frame->compressed && $buffer !== '') {
$buffer = $this->compressor->decompress($buffer);
}

($this->push)($this->encoder->decode($buffer, $this->type));
}
}
}
134 changes: 134 additions & 0 deletions tests/Internal/Protocol/ParserTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<?php

declare(strict_types=1);

namespace Thesis\Grpc\Internal\Protocol;

use Echos\Api\V1\EchoRequest;
use PHPUnit\Framework\Attributes\CoversClass;
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\TestCase;
use Thesis\Grpc\Compression\Compressor;
use Thesis\Grpc\Compression\GzipCompressor;
use Thesis\Grpc\Compression\IdentityCompressor;
use Thesis\Grpc\Protobuf\ProtobufEncoder;

#[CoversClass(Parser::class)]
final class ParserTest extends TestCase
{
/**
* @param list<string> $chunks
* @param list<EchoRequest> $expected
*/
#[DataProvider('providePushCases')]
public function testPush(array $chunks, array $expected, Compressor $compressor): void
{
$frames = [];

$parser = new Parser(
static function (EchoRequest $request) use (&$frames): void {
$frames[] = $request;
},
EchoRequest::class,
ProtobufEncoder::default(),
$compressor,
);

foreach ($chunks as $chunk) {
$parser->push($chunk);
}

self::assertEquals($expected, $frames);
}

/**
* @return iterable<string, array{list<string>, list<EchoRequest>, Compressor}>
*/
public static function providePushCases(): iterable
{
foreach ([IdentityCompressor::Compressor, new GzipCompressor()] as $compressor) {
$name = $compressor->name();

yield "{$name}: single complete frame" => [
[self::frame('hello', $compressor)],
[
new EchoRequest('hello'),
],
$compressor,
];

yield "{$name}: two frames in one chunk" => [
[self::frame('first', $compressor) . self::frame('second', $compressor)],
[
new EchoRequest('first'),
new EchoRequest('second'),
],
$compressor,
];

yield "{$name}: three frames in one chunk" => [
[self::frame('a', $compressor) . self::frame('bb', $compressor) . self::frame('ccc', $compressor)],
[
new EchoRequest('a'),
new EchoRequest('bb'),
new EchoRequest('ccc'),
],
$compressor,
];

yield "{$name}: frame split across two chunks" => [
(static function () use ($compressor): array {
$frame = self::frame('whole', $compressor);
$mid = (int) (\strlen($frame) / 2);

return [substr($frame, 0, $mid), substr($frame, $mid)];
})(),
[
new EchoRequest('whole'),
],
$compressor,
];

yield "{$name}: one complete frame + half of second" => [
(static function () use ($compressor): array {
$f1 = self::frame('complete', $compressor);
$f2 = self::frame('split', $compressor);
$mid = (int) (\strlen($f2) / 2);

return [$f1 . substr($f2, 0, $mid), substr($f2, $mid)];
})(),
[
new EchoRequest('complete'),
new EchoRequest('split'),
],
$compressor,
];

yield "{$name}: byte by byte" => [
str_split(self::frame('slow', $compressor)),
[
new EchoRequest('slow'),
],
$compressor,
];

yield "{$name}: empty payload" => [
[encodeFrame(new Frame(false, ''))],
[
new EchoRequest(),
],
$compressor,
];
}
}

private static function frame(string $sentence, Compressor $compressor = IdentityCompressor::Compressor): string
{
$payload = ProtobufEncoder::default()->encode(new EchoRequest($sentence));

return encodeFrame(new Frame(
$compressor->name() !== IdentityCompressor::Compressor->name(),
$payload !== '' ? $compressor->compress($payload) : $payload,
));
}
}