Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/main/java/io/moderne/jsonrpc/JsonRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.moderne.jsonrpc.handler.MessageHandler;
import lombok.RequiredArgsConstructor;

import java.io.EOFException;
import java.util.Map;
import java.util.concurrent.*;

Expand Down Expand Up @@ -79,9 +80,13 @@ protected void compute() {
} else if (response instanceof JsonRpcSuccess) {
responseFuture.complete((JsonRpcSuccess) response);
}
} else if (response instanceof JsonRpcError) {
} else if (response instanceof JsonRpcError && !openRequests.isEmpty()) {
// Error with no id — fail all open requests since we
// can't correlate this error to a specific one
// can't correlate this error to a specific one. Skip
// when there's nothing to fail; allocating a Throwable
// (and filling its stack) per malformed message is
// expensive enough to peg a CPU when an upstream peer
// emits non-RPC noise on the wire.
JsonRpcException exception = new JsonRpcException((JsonRpcError) response);
for (CompletableFuture<JsonRpcSuccess> future : openRequests.values()) {
future.completeExceptionally(exception);
Expand Down Expand Up @@ -114,6 +119,17 @@ protected void compute() {
}).fork();
}
}
} catch (EOFException e) {
// Peer closed the stream — there's nothing more to read.
// Fail any in-flight requests once and exit the loop
// instead of spinning on the closed pipe.
JsonRpcException eof = new JsonRpcException(
JsonRpcError.internalError(null, "JSON-RPC peer closed the stream"));
for (CompletableFuture<JsonRpcSuccess> future : openRequests.values()) {
future.completeExceptionally(eof);
}
openRequests.clear();
shutdown = true;
} catch (Throwable t) {
// Fork error sends off the reader thread to avoid
// deadlock with synchronized send()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,14 @@ public HeaderDelimitedMessageHandler(MessageFormatter formatter, InputStream inp
}

@Override
public JsonRpcMessage receive(MessageFormatter formatter) {
public JsonRpcMessage receive(MessageFormatter formatter) throws IOException {
MessageFormatter effectiveFormatter = this.formatter != null ? this.formatter : formatter;
byte[] content = null;
try {
// readLineFromInputStream throws EOFException when the peer has closed
// the stream cleanly between messages; let that propagate so the reader
// loop can exit instead of treating EOF as a malformed message and
// spinning at full CPU constructing exceptions for every empty read.
String contentLength = readLineFromInputStream();
Matcher contentLengthMatcher = CONTENT_LENGTH.matcher(contentLength);
if (!contentLengthMatcher.matches()) {
Expand All @@ -91,16 +95,19 @@ public JsonRpcMessage receive(MessageFormatter formatter) {
for (int totalRead = 0; totalRead < content.length; ) {
int bytesRead = inputStream.read(content, totalRead, content.length - totalRead);
if (bytesRead == -1) {
// Stream ended unexpectedly before reading full content
return JsonRpcError.invalidRequest(null,
"Content length mismatch. Expected " + content.length +
" but received " + totalRead);
// Mid-message EOF — treat as a closed stream rather than a
// recoverable parse error, otherwise the loop spins on the
// already-closed pipe.
throw new EOFException("Stream closed mid-message after " + totalRead +
" of " + content.length + " bytes");
}
totalRead += bytesRead;
}

ByteArrayInputStream bis = new ByteArrayInputStream(content);
return effectiveFormatter.deserialize(bis);
} catch (EOFException e) {
throw e;
} catch (IOException e) {
return JsonRpcError.invalidRequest(extractId(content), e.getMessage());
}
Expand Down Expand Up @@ -142,15 +149,21 @@ public JsonRpcMessage receive(MessageFormatter formatter) {

private String readLineFromInputStream() throws IOException {
StringBuilder sb = new StringBuilder();
int c;
while ((c = inputStream.read()) != -1) {
int c = inputStream.read();
if (c == -1) {
// EOF before any byte was read: peer closed the stream between
// messages. Surface as EOFException so the reader loop can shut
// down instead of returning an empty string that the caller would
// misinterpret as a malformed header.
throw new EOFException("Stream closed");
}
do {
if (c == '\n') {
break;
} else if (c == '\r') {
continue;
} else if (c != '\r') {
sb.append((char) c);
}
sb.append((char) c);
}
} while ((c = inputStream.read()) != -1);
return sb.toString();
}

Expand Down
10 changes: 9 additions & 1 deletion src/main/java/io/moderne/jsonrpc/handler/MessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@
import io.moderne.jsonrpc.JsonRpcMessage;
import io.moderne.jsonrpc.formatter.MessageFormatter;

import java.io.EOFException;
import java.io.IOException;

public interface MessageHandler {

JsonRpcMessage receive(MessageFormatter formatter);
/**
* @throws EOFException when the underlying stream has been closed cleanly
* between messages — the reader loop should shut down.
* @throws IOException for unrecoverable transport failures.
*/
JsonRpcMessage receive(MessageFormatter formatter) throws IOException;

void send(JsonRpcMessage msg, MessageFormatter formatter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import io.moderne.jsonrpc.formatter.MessageFormatter;
import lombok.RequiredArgsConstructor;

import java.io.IOException;

@RequiredArgsConstructor
public class MeteredMessageHandler implements MessageHandler {
private final MessageHandler delegate;
private final MeterRegistry meterRegistry;

@Override
public JsonRpcMessage receive(MessageFormatter formatter) {
public JsonRpcMessage receive(MessageFormatter formatter) throws IOException {
Timer.Sample sample = Timer.start(meterRegistry);
Timer.Builder timer = Timer.builder("jsonrpc.receive")
.description("Time taken to receive a JSON-RPC message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,19 @@ public class NewLineDelimitedMessageHandler implements MessageHandler {
private final OutputStream outputStream;

@Override
public JsonRpcMessage receive(MessageFormatter formatter) {
try {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int b;
while ((b = inputStream.read()) != -1) {
buffer.write(b);
if (b == '\n') {
break;
}
}
return formatter.deserialize(new ByteArrayInputStream(buffer.toByteArray()));
} catch (IOException e) {
throw new UncheckedIOException(e);
public JsonRpcMessage receive(MessageFormatter formatter) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int b = inputStream.read();
if (b == -1) {
throw new EOFException("Stream closed");
}
do {
buffer.write(b);
if (b == '\n') {
break;
}
} while ((b = inputStream.read()) != -1);
return formatter.deserialize(new ByteArrayInputStream(buffer.toByteArray()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.moderne.jsonrpc.formatter.MessageFormatter;
import lombok.RequiredArgsConstructor;

import java.io.IOException;
import java.io.PrintStream;

@RequiredArgsConstructor
Expand All @@ -34,7 +35,7 @@ public TraceMessageHandler(String name, MessageHandler delegate) {
}

@Override
public JsonRpcMessage receive(MessageFormatter formatter) {
public JsonRpcMessage receive(MessageFormatter formatter) throws IOException {
JsonRpcMessage message = delegate.receive(formatter);
if (message instanceof JsonRpcResponse) {
out.printf("<-(%s)- %s%n", name, message);
Expand Down
27 changes: 27 additions & 0 deletions src/test/java/io/moderne/jsonrpc/JsonRpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -137,6 +140,30 @@ void positionalRequestMismatchedToNamedParameterMethod() {
).hasCauseInstanceOf(JsonRpcException.class);
}

@Test
void readerLoopExitsCleanlyOnEof() throws Exception {
// When the peer closes the stream, the reader loop must shut down
// instead of spinning at full CPU constructing JsonRpcException
// instances per garbage byte.
ByteArrayInputStream closedStream = new ByteArrayInputStream(new byte[0]);
ByteArrayOutputStream out = new ByteArrayOutputStream();
JsonMessageFormatter formatter = new JsonMessageFormatter();
JsonRpc localRpc = new JsonRpc(
new HeaderDelimitedMessageHandler(closedStream, out),
formatter)
.bind();
try {
CompletableFuture<JsonRpcSuccess> inFlight =
localRpc.send(JsonRpcRequest.newRequest("never-arrives"));

// The in-flight request must fail (not hang) once the loop sees EOF.
assertThatThrownBy(() -> inFlight.get(5, TimeUnit.SECONDS))
.hasCauseInstanceOf(JsonRpcException.class);
} finally {
localRpc.shutdown();
}
}

record Person(String name) {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2025 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.moderne.jsonrpc.handler;

import io.moderne.jsonrpc.JsonRpcError;
import io.moderne.jsonrpc.JsonRpcMessage;
import io.moderne.jsonrpc.formatter.JsonMessageFormatter;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.InputStream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class HeaderDelimitedMessageHandlerTest {
private static final JsonMessageFormatter FORMATTER = new JsonMessageFormatter();

@Test
void receiveThrowsEofWhenStreamClosedBetweenMessages() {
InputStream empty = new ByteArrayInputStream(new byte[0]);
HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(empty, new ByteArrayOutputStream());

assertThatThrownBy(() -> handler.receive(FORMATTER))
.isInstanceOf(EOFException.class);
}

@Test
void receiveStillReturnsInvalidRequestForNonEmptyMalformedHeader() throws Exception {
// A non-RPC line on the wire should still produce a recoverable
// invalidRequest error (not EOF) so callers can decide what to do.
InputStream noise = new ByteArrayInputStream("warning: something\n".getBytes());
HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(noise, new ByteArrayOutputStream());

JsonRpcMessage msg = handler.receive(FORMATTER);

assertThat(msg).isInstanceOf(JsonRpcError.class);
assertThat(((JsonRpcError) msg).getError().getMessage())
.contains("Expected Content-Length header");
}

@Test
void receiveThrowsEofWhenStreamClosesMidMessage() {
// Headers parsed, body promised but stream cuts off — must surface as
// EOF so the reader loop can shut down rather than spin.
String partial = "Content-Length: 100\r\n\r\n{partial";
InputStream truncated = new ByteArrayInputStream(partial.getBytes());
HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(truncated, new ByteArrayOutputStream());

assertThatThrownBy(() -> handler.receive(FORMATTER))
.isInstanceOf(EOFException.class);
}
}
Loading