From 2082f59933dc103e382890fa810553d36f0b43af Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Sun, 5 Apr 2026 14:43:23 +0300 Subject: [PATCH] Implement buffered grpc parser --- src/Internal/Http2/StreamCodec.php | 28 ++---- src/Internal/Protocol/Parser.php | 61 +++++++++++ tests/Internal/Protocol/ParserTest.php | 134 +++++++++++++++++++++++++ 3 files changed, 205 insertions(+), 18 deletions(-) create mode 100644 src/Internal/Protocol/Parser.php create mode 100644 tests/Internal/Protocol/ParserTest.php diff --git a/src/Internal/Http2/StreamCodec.php b/src/Internal/Http2/StreamCodec.php index e5b0e6e..d28a378 100644 --- a/src/Internal/Http2/StreamCodec.php +++ b/src/Internal/Http2/StreamCodec.php @@ -88,32 +88,24 @@ public function decode( /** @var Pipeline\Queue $out */ $out = new Pipeline\Queue(); - $encoder = $this->encoder; - $compressor = $this->compressor; + $parser = new Protocol\Parser( + $out->push(...), + $type, + $this->encoder, + $this->compressor, + ); EventLoop::queue(static function () use ( - $encoder, - $compressor, + $parser, $in, $out, - $type, $cancellation, ): void { try { - while (($message = $in->read($cancellation)) !== null) { - \assert($message !== '', 'gRPC frame must not be empty.'); - - $frame = Protocol\decodeFrame($message); - - $buffer = $frame->buffer; - - if ($frame->compressed && $buffer !== '') { - $buffer = $compressor->decompress($buffer); - } - - $out->push($encoder->decode($buffer, $type)); + while (($chunk = $in->read($cancellation)) !== null) { + $parser->push($chunk); } - } catch (Pipeline\DisposedException|CancelledException) { + } catch (Pipeline\DisposedException|CancelledException) { // @phpstan-ignore catch.neverThrown, catch.neverThrown } catch (\Throwable $e) { $out->error($e); } finally { diff --git a/src/Internal/Protocol/Parser.php b/src/Internal/Protocol/Parser.php new file mode 100644 index 0000000..ff1ce49 --- /dev/null +++ b/src/Internal/Protocol/Parser.php @@ -0,0 +1,61 @@ + $type + */ + public function __construct( + private readonly \Closure $push, + private readonly string $type, + private readonly Encoding\Encoder $encoder, + private readonly Compression\Compressor $compressor, + ) {} + + /** + * @throws Compression\DecompressionFailed + * @throws Encoding\DecodingFailed + */ + public function push(string $data): void + { + $this->buffer .= $data; + + while (\strlen($this->buffer) >= bodyOffset) { + $messageLength = byteOrder->unpackUint32( + /** @phpstan-ignore argument.type */ + substr($this->buffer, lengthOffset, 4), + ); + + $frameSize = bodyOffset + $messageLength; + + if (\strlen($this->buffer) < $frameSize) { + break; + } + + $frame = decodeFrame(substr($this->buffer, 0, $frameSize)); + $this->buffer = substr($this->buffer, $frameSize); + + $buffer = $frame->buffer; + + if ($frame->compressed && $buffer !== '') { + $buffer = $this->compressor->decompress($buffer); + } + + ($this->push)($this->encoder->decode($buffer, $this->type)); + } + } +} diff --git a/tests/Internal/Protocol/ParserTest.php b/tests/Internal/Protocol/ParserTest.php new file mode 100644 index 0000000..8553fdc --- /dev/null +++ b/tests/Internal/Protocol/ParserTest.php @@ -0,0 +1,134 @@ + $chunks + * @param list $expected + */ + #[DataProvider('providePushCases')] + public function testPush(array $chunks, array $expected, Compressor $compressor): void + { + $frames = []; + + $parser = new Parser( + static function (EchoRequest $request) use (&$frames): void { + $frames[] = $request; + }, + EchoRequest::class, + ProtobufEncoder::default(), + $compressor, + ); + + foreach ($chunks as $chunk) { + $parser->push($chunk); + } + + self::assertEquals($expected, $frames); + } + + /** + * @return iterable, list, Compressor}> + */ + public static function providePushCases(): iterable + { + foreach ([IdentityCompressor::Compressor, new GzipCompressor()] as $compressor) { + $name = $compressor->name(); + + yield "{$name}: single complete frame" => [ + [self::frame('hello', $compressor)], + [ + new EchoRequest('hello'), + ], + $compressor, + ]; + + yield "{$name}: two frames in one chunk" => [ + [self::frame('first', $compressor) . self::frame('second', $compressor)], + [ + new EchoRequest('first'), + new EchoRequest('second'), + ], + $compressor, + ]; + + yield "{$name}: three frames in one chunk" => [ + [self::frame('a', $compressor) . self::frame('bb', $compressor) . self::frame('ccc', $compressor)], + [ + new EchoRequest('a'), + new EchoRequest('bb'), + new EchoRequest('ccc'), + ], + $compressor, + ]; + + yield "{$name}: frame split across two chunks" => [ + (static function () use ($compressor): array { + $frame = self::frame('whole', $compressor); + $mid = (int) (\strlen($frame) / 2); + + return [substr($frame, 0, $mid), substr($frame, $mid)]; + })(), + [ + new EchoRequest('whole'), + ], + $compressor, + ]; + + yield "{$name}: one complete frame + half of second" => [ + (static function () use ($compressor): array { + $f1 = self::frame('complete', $compressor); + $f2 = self::frame('split', $compressor); + $mid = (int) (\strlen($f2) / 2); + + return [$f1 . substr($f2, 0, $mid), substr($f2, $mid)]; + })(), + [ + new EchoRequest('complete'), + new EchoRequest('split'), + ], + $compressor, + ]; + + yield "{$name}: byte by byte" => [ + str_split(self::frame('slow', $compressor)), + [ + new EchoRequest('slow'), + ], + $compressor, + ]; + + yield "{$name}: empty payload" => [ + [encodeFrame(new Frame(false, ''))], + [ + new EchoRequest(), + ], + $compressor, + ]; + } + } + + private static function frame(string $sentence, Compressor $compressor = IdentityCompressor::Compressor): string + { + $payload = ProtobufEncoder::default()->encode(new EchoRequest($sentence)); + + return encodeFrame(new Frame( + $compressor->name() !== IdentityCompressor::Compressor->name(), + $payload !== '' ? $compressor->compress($payload) : $payload, + )); + } +}