From 722e15beb5fcc35ab944231b6cf82c5a644693f9 Mon Sep 17 00:00:00 2001 From: Jonathan Schneider Date: Wed, 6 May 2026 16:50:36 -0400 Subject: [PATCH] Detect stream EOF in reader loop instead of spinning When an RPC peer dies, its stdout pipe goes to permanent EOF. The previous reader loop treated EOF as an empty header line, synthesized a JsonRpcError with no id, constructed a JsonRpcException, looped, and repeated -- pegging a CPU core for the lifetime of the JVM. Observed in production where rewrite-marketplace burned ~76% of one core for 20+ hours filling stack traces after npx exited at startup. The handlers now distinguish "stream closed" (EOFException) from "malformed message on a live stream" (recoverable JsonRpcError). The JsonRpc reader loop catches EOFException once, fails any in-flight requests, and exits. Also guards the no-id error broadcast: skip the JsonRpcException allocation entirely when there are no open requests to fail. --- src/main/java/io/moderne/jsonrpc/JsonRpc.java | 20 +++++- .../HeaderDelimitedMessageHandler.java | 35 +++++++--- .../jsonrpc/handler/MessageHandler.java | 10 ++- .../handler/MeteredMessageHandler.java | 4 +- .../NewLineDelimitedMessageHandler.java | 25 ++++--- .../jsonrpc/handler/TraceMessageHandler.java | 3 +- .../java/io/moderne/jsonrpc/JsonRpcTest.java | 27 ++++++++ .../HeaderDelimitedMessageHandlerTest.java | 68 +++++++++++++++++++ 8 files changed, 163 insertions(+), 29 deletions(-) create mode 100644 src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpc.java b/src/main/java/io/moderne/jsonrpc/JsonRpc.java index 108edb9..220c425 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpc.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpc.java @@ -20,6 +20,7 @@ import io.moderne.jsonrpc.handler.MessageHandler; import lombok.RequiredArgsConstructor; +import java.io.EOFException; import java.util.Map; import java.util.concurrent.*; @@ -79,9 +80,13 @@ protected void compute() { } else if (response instanceof JsonRpcSuccess) { responseFuture.complete((JsonRpcSuccess) response); } - } else if (response instanceof JsonRpcError) { + } else if (response instanceof JsonRpcError && !openRequests.isEmpty()) { // Error with no id — fail all open requests since we - // can't correlate this error to a specific one + // can't correlate this error to a specific one. Skip + // when there's nothing to fail; allocating a Throwable + // (and filling its stack) per malformed message is + // expensive enough to peg a CPU when an upstream peer + // emits non-RPC noise on the wire. JsonRpcException exception = new JsonRpcException((JsonRpcError) response); for (CompletableFuture future : openRequests.values()) { future.completeExceptionally(exception); @@ -114,6 +119,17 @@ protected void compute() { }).fork(); } } + } catch (EOFException e) { + // Peer closed the stream — there's nothing more to read. + // Fail any in-flight requests once and exit the loop + // instead of spinning on the closed pipe. + JsonRpcException eof = new JsonRpcException( + JsonRpcError.internalError(null, "JSON-RPC peer closed the stream")); + for (CompletableFuture future : openRequests.values()) { + future.completeExceptionally(eof); + } + openRequests.clear(); + shutdown = true; } catch (Throwable t) { // Fork error sends off the reader thread to avoid // deadlock with synchronized send() diff --git a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java index 428ab9d..1900248 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java @@ -64,10 +64,14 @@ public HeaderDelimitedMessageHandler(MessageFormatter formatter, InputStream inp } @Override - public JsonRpcMessage receive(MessageFormatter formatter) { + public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { MessageFormatter effectiveFormatter = this.formatter != null ? this.formatter : formatter; byte[] content = null; try { + // readLineFromInputStream throws EOFException when the peer has closed + // the stream cleanly between messages; let that propagate so the reader + // loop can exit instead of treating EOF as a malformed message and + // spinning at full CPU constructing exceptions for every empty read. String contentLength = readLineFromInputStream(); Matcher contentLengthMatcher = CONTENT_LENGTH.matcher(contentLength); if (!contentLengthMatcher.matches()) { @@ -91,16 +95,19 @@ public JsonRpcMessage receive(MessageFormatter formatter) { for (int totalRead = 0; totalRead < content.length; ) { int bytesRead = inputStream.read(content, totalRead, content.length - totalRead); if (bytesRead == -1) { - // Stream ended unexpectedly before reading full content - return JsonRpcError.invalidRequest(null, - "Content length mismatch. Expected " + content.length + - " but received " + totalRead); + // Mid-message EOF — treat as a closed stream rather than a + // recoverable parse error, otherwise the loop spins on the + // already-closed pipe. + throw new EOFException("Stream closed mid-message after " + totalRead + + " of " + content.length + " bytes"); } totalRead += bytesRead; } ByteArrayInputStream bis = new ByteArrayInputStream(content); return effectiveFormatter.deserialize(bis); + } catch (EOFException e) { + throw e; } catch (IOException e) { return JsonRpcError.invalidRequest(extractId(content), e.getMessage()); } @@ -142,15 +149,21 @@ public JsonRpcMessage receive(MessageFormatter formatter) { private String readLineFromInputStream() throws IOException { StringBuilder sb = new StringBuilder(); - int c; - while ((c = inputStream.read()) != -1) { + int c = inputStream.read(); + if (c == -1) { + // EOF before any byte was read: peer closed the stream between + // messages. Surface as EOFException so the reader loop can shut + // down instead of returning an empty string that the caller would + // misinterpret as a malformed header. + throw new EOFException("Stream closed"); + } + do { if (c == '\n') { break; - } else if (c == '\r') { - continue; + } else if (c != '\r') { + sb.append((char) c); } - sb.append((char) c); - } + } while ((c = inputStream.read()) != -1); return sb.toString(); } diff --git a/src/main/java/io/moderne/jsonrpc/handler/MessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/MessageHandler.java index 3c5a35e..faef8a2 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/MessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/MessageHandler.java @@ -18,9 +18,17 @@ import io.moderne.jsonrpc.JsonRpcMessage; import io.moderne.jsonrpc.formatter.MessageFormatter; +import java.io.EOFException; +import java.io.IOException; + public interface MessageHandler { - JsonRpcMessage receive(MessageFormatter formatter); + /** + * @throws EOFException when the underlying stream has been closed cleanly + * between messages — the reader loop should shut down. + * @throws IOException for unrecoverable transport failures. + */ + JsonRpcMessage receive(MessageFormatter formatter) throws IOException; void send(JsonRpcMessage msg, MessageFormatter formatter); } diff --git a/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java index b391d4c..ca29ea7 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java @@ -24,13 +24,15 @@ import io.moderne.jsonrpc.formatter.MessageFormatter; import lombok.RequiredArgsConstructor; +import java.io.IOException; + @RequiredArgsConstructor public class MeteredMessageHandler implements MessageHandler { private final MessageHandler delegate; private final MeterRegistry meterRegistry; @Override - public JsonRpcMessage receive(MessageFormatter formatter) { + public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { Timer.Sample sample = Timer.start(meterRegistry); Timer.Builder timer = Timer.builder("jsonrpc.receive") .description("Time taken to receive a JSON-RPC message") diff --git a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java index 6408dd9..e422391 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java @@ -31,20 +31,19 @@ public class NewLineDelimitedMessageHandler implements MessageHandler { private final OutputStream outputStream; @Override - public JsonRpcMessage receive(MessageFormatter formatter) { - try { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - int b; - while ((b = inputStream.read()) != -1) { - buffer.write(b); - if (b == '\n') { - break; - } - } - return formatter.deserialize(new ByteArrayInputStream(buffer.toByteArray())); - } catch (IOException e) { - throw new UncheckedIOException(e); + public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + int b = inputStream.read(); + if (b == -1) { + throw new EOFException("Stream closed"); } + do { + buffer.write(b); + if (b == '\n') { + break; + } + } while ((b = inputStream.read()) != -1); + return formatter.deserialize(new ByteArrayInputStream(buffer.toByteArray())); } @Override diff --git a/src/main/java/io/moderne/jsonrpc/handler/TraceMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/TraceMessageHandler.java index 46af21a..87b00ca 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/TraceMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/TraceMessageHandler.java @@ -21,6 +21,7 @@ import io.moderne.jsonrpc.formatter.MessageFormatter; import lombok.RequiredArgsConstructor; +import java.io.IOException; import java.io.PrintStream; @RequiredArgsConstructor @@ -34,7 +35,7 @@ public TraceMessageHandler(String name, MessageHandler delegate) { } @Override - public JsonRpcMessage receive(MessageFormatter formatter) { + public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { JsonRpcMessage message = delegate.receive(formatter); if (message instanceof JsonRpcResponse) { out.printf("<-(%s)- %s%n", name, message); diff --git a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java index feef23a..122441b 100644 --- a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java +++ b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java @@ -22,10 +22,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -137,6 +140,30 @@ void positionalRequestMismatchedToNamedParameterMethod() { ).hasCauseInstanceOf(JsonRpcException.class); } + @Test + void readerLoopExitsCleanlyOnEof() throws Exception { + // When the peer closes the stream, the reader loop must shut down + // instead of spinning at full CPU constructing JsonRpcException + // instances per garbage byte. + ByteArrayInputStream closedStream = new ByteArrayInputStream(new byte[0]); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonMessageFormatter formatter = new JsonMessageFormatter(); + JsonRpc localRpc = new JsonRpc( + new HeaderDelimitedMessageHandler(closedStream, out), + formatter) + .bind(); + try { + CompletableFuture inFlight = + localRpc.send(JsonRpcRequest.newRequest("never-arrives")); + + // The in-flight request must fail (not hang) once the loop sees EOF. + assertThatThrownBy(() -> inFlight.get(5, TimeUnit.SECONDS)) + .hasCauseInstanceOf(JsonRpcException.class); + } finally { + localRpc.shutdown(); + } + } + record Person(String name) { } diff --git a/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java b/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java new file mode 100644 index 0000000..e312e6f --- /dev/null +++ b/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc.handler; + +import io.moderne.jsonrpc.JsonRpcError; +import io.moderne.jsonrpc.JsonRpcMessage; +import io.moderne.jsonrpc.formatter.JsonMessageFormatter; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.InputStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class HeaderDelimitedMessageHandlerTest { + private static final JsonMessageFormatter FORMATTER = new JsonMessageFormatter(); + + @Test + void receiveThrowsEofWhenStreamClosedBetweenMessages() { + InputStream empty = new ByteArrayInputStream(new byte[0]); + HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(empty, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(EOFException.class); + } + + @Test + void receiveStillReturnsInvalidRequestForNonEmptyMalformedHeader() throws Exception { + // A non-RPC line on the wire should still produce a recoverable + // invalidRequest error (not EOF) so callers can decide what to do. + InputStream noise = new ByteArrayInputStream("warning: something\n".getBytes()); + HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(noise, new ByteArrayOutputStream()); + + JsonRpcMessage msg = handler.receive(FORMATTER); + + assertThat(msg).isInstanceOf(JsonRpcError.class); + assertThat(((JsonRpcError) msg).getError().getMessage()) + .contains("Expected Content-Length header"); + } + + @Test + void receiveThrowsEofWhenStreamClosesMidMessage() { + // Headers parsed, body promised but stream cuts off — must surface as + // EOF so the reader loop can shut down rather than spin. + String partial = "Content-Length: 100\r\n\r\n{partial"; + InputStream truncated = new ByteArrayInputStream(partial.getBytes()); + HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(truncated, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(EOFException.class); + } +}