diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpc.java b/src/main/java/io/moderne/jsonrpc/JsonRpc.java index 108edb9..220c425 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpc.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpc.java @@ -20,6 +20,7 @@ import io.moderne.jsonrpc.handler.MessageHandler; import lombok.RequiredArgsConstructor; +import java.io.EOFException; import java.util.Map; import java.util.concurrent.*; @@ -79,9 +80,13 @@ protected void compute() { } else if (response instanceof JsonRpcSuccess) { responseFuture.complete((JsonRpcSuccess) response); } - } else if (response instanceof JsonRpcError) { + } else if (response instanceof JsonRpcError && !openRequests.isEmpty()) { // Error with no id — fail all open requests since we - // can't correlate this error to a specific one + // can't correlate this error to a specific one. Skip + // when there's nothing to fail; allocating a Throwable + // (and filling its stack) per malformed message is + // expensive enough to peg a CPU when an upstream peer + // emits non-RPC noise on the wire. JsonRpcException exception = new JsonRpcException((JsonRpcError) response); for (CompletableFuture future : openRequests.values()) { future.completeExceptionally(exception); @@ -114,6 +119,17 @@ protected void compute() { }).fork(); } } + } catch (EOFException e) { + // Peer closed the stream — there's nothing more to read. + // Fail any in-flight requests once and exit the loop + // instead of spinning on the closed pipe. + JsonRpcException eof = new JsonRpcException( + JsonRpcError.internalError(null, "JSON-RPC peer closed the stream")); + for (CompletableFuture future : openRequests.values()) { + future.completeExceptionally(eof); + } + openRequests.clear(); + shutdown = true; } catch (Throwable t) { // Fork error sends off the reader thread to avoid // deadlock with synchronized send() diff --git a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java index 428ab9d..1900248 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java @@ -64,10 +64,14 @@ public HeaderDelimitedMessageHandler(MessageFormatter formatter, InputStream inp } @Override - public JsonRpcMessage receive(MessageFormatter formatter) { + public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { MessageFormatter effectiveFormatter = this.formatter != null ? this.formatter : formatter; byte[] content = null; try { + // readLineFromInputStream throws EOFException when the peer has closed + // the stream cleanly between messages; let that propagate so the reader + // loop can exit instead of treating EOF as a malformed message and + // spinning at full CPU constructing exceptions for every empty read. String contentLength = readLineFromInputStream(); Matcher contentLengthMatcher = CONTENT_LENGTH.matcher(contentLength); if (!contentLengthMatcher.matches()) { @@ -91,16 +95,19 @@ public JsonRpcMessage receive(MessageFormatter formatter) { for (int totalRead = 0; totalRead < content.length; ) { int bytesRead = inputStream.read(content, totalRead, content.length - totalRead); if (bytesRead == -1) { - // Stream ended unexpectedly before reading full content - return JsonRpcError.invalidRequest(null, - "Content length mismatch. Expected " + content.length + - " but received " + totalRead); + // Mid-message EOF — treat as a closed stream rather than a + // recoverable parse error, otherwise the loop spins on the + // already-closed pipe. + throw new EOFException("Stream closed mid-message after " + totalRead + + " of " + content.length + " bytes"); } totalRead += bytesRead; } ByteArrayInputStream bis = new ByteArrayInputStream(content); return effectiveFormatter.deserialize(bis); + } catch (EOFException e) { + throw e; } catch (IOException e) { return JsonRpcError.invalidRequest(extractId(content), e.getMessage()); } @@ -142,15 +149,21 @@ public JsonRpcMessage receive(MessageFormatter formatter) { private String readLineFromInputStream() throws IOException { StringBuilder sb = new StringBuilder(); - int c; - while ((c = inputStream.read()) != -1) { + int c = inputStream.read(); + if (c == -1) { + // EOF before any byte was read: peer closed the stream between + // messages. Surface as EOFException so the reader loop can shut + // down instead of returning an empty string that the caller would + // misinterpret as a malformed header. + throw new EOFException("Stream closed"); + } + do { if (c == '\n') { break; - } else if (c == '\r') { - continue; + } else if (c != '\r') { + sb.append((char) c); } - sb.append((char) c); - } + } while ((c = inputStream.read()) != -1); return sb.toString(); } diff --git a/src/main/java/io/moderne/jsonrpc/handler/MessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/MessageHandler.java index 3c5a35e..faef8a2 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/MessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/MessageHandler.java @@ -18,9 +18,17 @@ import io.moderne.jsonrpc.JsonRpcMessage; import io.moderne.jsonrpc.formatter.MessageFormatter; +import java.io.EOFException; +import java.io.IOException; + public interface MessageHandler { - JsonRpcMessage receive(MessageFormatter formatter); + /** + * @throws EOFException when the underlying stream has been closed cleanly + * between messages — the reader loop should shut down. + * @throws IOException for unrecoverable transport failures. + */ + JsonRpcMessage receive(MessageFormatter formatter) throws IOException; void send(JsonRpcMessage msg, MessageFormatter formatter); } diff --git a/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java index b391d4c..ca29ea7 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java @@ -24,13 +24,15 @@ import io.moderne.jsonrpc.formatter.MessageFormatter; import lombok.RequiredArgsConstructor; +import java.io.IOException; + @RequiredArgsConstructor public class MeteredMessageHandler implements MessageHandler { private final MessageHandler delegate; private final MeterRegistry meterRegistry; @Override - public JsonRpcMessage receive(MessageFormatter formatter) { + public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { Timer.Sample sample = Timer.start(meterRegistry); Timer.Builder timer = Timer.builder("jsonrpc.receive") .description("Time taken to receive a JSON-RPC message") diff --git a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java index 6408dd9..e422391 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java @@ -31,20 +31,19 @@ public class NewLineDelimitedMessageHandler implements MessageHandler { private final OutputStream outputStream; @Override - public JsonRpcMessage receive(MessageFormatter formatter) { - try { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - int b; - while ((b = inputStream.read()) != -1) { - buffer.write(b); - if (b == '\n') { - break; - } - } - return formatter.deserialize(new ByteArrayInputStream(buffer.toByteArray())); - } catch (IOException e) { - throw new UncheckedIOException(e); + public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + int b = inputStream.read(); + if (b == -1) { + throw new EOFException("Stream closed"); } + do { + buffer.write(b); + if (b == '\n') { + break; + } + } while ((b = inputStream.read()) != -1); + return formatter.deserialize(new ByteArrayInputStream(buffer.toByteArray())); } @Override diff --git a/src/main/java/io/moderne/jsonrpc/handler/TraceMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/TraceMessageHandler.java index 46af21a..87b00ca 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/TraceMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/TraceMessageHandler.java @@ -21,6 +21,7 @@ import io.moderne.jsonrpc.formatter.MessageFormatter; import lombok.RequiredArgsConstructor; +import java.io.IOException; import java.io.PrintStream; @RequiredArgsConstructor @@ -34,7 +35,7 @@ public TraceMessageHandler(String name, MessageHandler delegate) { } @Override - public JsonRpcMessage receive(MessageFormatter formatter) { + public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { JsonRpcMessage message = delegate.receive(formatter); if (message instanceof JsonRpcResponse) { out.printf("<-(%s)- %s%n", name, message); diff --git a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java index feef23a..122441b 100644 --- a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java +++ b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java @@ -22,10 +22,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -137,6 +140,30 @@ void positionalRequestMismatchedToNamedParameterMethod() { ).hasCauseInstanceOf(JsonRpcException.class); } + @Test + void readerLoopExitsCleanlyOnEof() throws Exception { + // When the peer closes the stream, the reader loop must shut down + // instead of spinning at full CPU constructing JsonRpcException + // instances per garbage byte. + ByteArrayInputStream closedStream = new ByteArrayInputStream(new byte[0]); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonMessageFormatter formatter = new JsonMessageFormatter(); + JsonRpc localRpc = new JsonRpc( + new HeaderDelimitedMessageHandler(closedStream, out), + formatter) + .bind(); + try { + CompletableFuture inFlight = + localRpc.send(JsonRpcRequest.newRequest("never-arrives")); + + // The in-flight request must fail (not hang) once the loop sees EOF. + assertThatThrownBy(() -> inFlight.get(5, TimeUnit.SECONDS)) + .hasCauseInstanceOf(JsonRpcException.class); + } finally { + localRpc.shutdown(); + } + } + record Person(String name) { } diff --git a/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java b/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java new file mode 100644 index 0000000..e312e6f --- /dev/null +++ b/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc.handler; + +import io.moderne.jsonrpc.JsonRpcError; +import io.moderne.jsonrpc.JsonRpcMessage; +import io.moderne.jsonrpc.formatter.JsonMessageFormatter; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.InputStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class HeaderDelimitedMessageHandlerTest { + private static final JsonMessageFormatter FORMATTER = new JsonMessageFormatter(); + + @Test + void receiveThrowsEofWhenStreamClosedBetweenMessages() { + InputStream empty = new ByteArrayInputStream(new byte[0]); + HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(empty, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(EOFException.class); + } + + @Test + void receiveStillReturnsInvalidRequestForNonEmptyMalformedHeader() throws Exception { + // A non-RPC line on the wire should still produce a recoverable + // invalidRequest error (not EOF) so callers can decide what to do. + InputStream noise = new ByteArrayInputStream("warning: something\n".getBytes()); + HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(noise, new ByteArrayOutputStream()); + + JsonRpcMessage msg = handler.receive(FORMATTER); + + assertThat(msg).isInstanceOf(JsonRpcError.class); + assertThat(((JsonRpcError) msg).getError().getMessage()) + .contains("Expected Content-Length header"); + } + + @Test + void receiveThrowsEofWhenStreamClosesMidMessage() { + // Headers parsed, body promised but stream cuts off — must surface as + // EOF so the reader loop can shut down rather than spin. + String partial = "Content-Length: 100\r\n\r\n{partial"; + InputStream truncated = new ByteArrayInputStream(partial.getBytes()); + HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(truncated, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(EOFException.class); + } +}