Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/main/java/io/moderne/jsonrpc/JsonRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ protected void compute() {
}
openRequests.clear();
shutdown = true;
} catch (JsonRpcReceiveException e) {
// Frame- or parse-level failure on an inbound message.
// Send the error back to the peer; do NOT touch
// openRequests — those track responses we're waiting
// for from the peer, and the peer's malformed message
// is not one of them. Treating it as one would either
// complete an unrelated future on id collision, or
// (worse, on null id) fail every open request at once.
JsonRpcError errorToPeer = e.toError();
ForkJoinTask.adapt(() ->
messageHandler.send(errorToPeer, formatter)
).fork();
} catch (Throwable t) {
// Fork error sends off the reader thread to avoid
// deadlock with synchronized send()
Expand Down
47 changes: 47 additions & 0 deletions src/main/java/io/moderne/jsonrpc/JsonRpcReceiveException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2025 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.moderne.jsonrpc;

import org.jspecify.annotations.Nullable;

import java.io.IOException;

/**
* Thrown by a {@link io.moderne.jsonrpc.handler.MessageHandler} when an inbound
* message cannot be parsed or framed. Distinct from a {@link JsonRpcError}
* returned over the wire by a peer: this represents a local frame-level failure
* that should be reported back to the peer as an error response, not dispatched
* through the open-request correlation map. Mixing the two paths can complete
* an unrelated open client future on a malformed inbound request.
*/
public class JsonRpcReceiveException extends IOException {
private final @Nullable Object id;
private final JsonRpcError.Detail detail;

public JsonRpcReceiveException(@Nullable Object id, JsonRpcError.Detail detail) {
super(detail.getMessage());
this.id = id;
this.detail = detail;
}

public JsonRpcError toError() {
return new JsonRpcError(id, detail);
}

public static JsonRpcError.Detail invalidRequestDetail(@Nullable String message) {
return new JsonRpcError.Detail(-32600, "Invalid Request: " + message, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package io.moderne.jsonrpc.handler;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import io.moderne.jsonrpc.JsonRpcError;
import io.moderne.jsonrpc.JsonRpcMessage;
import io.moderne.jsonrpc.JsonRpcReceiveException;
import io.moderne.jsonrpc.formatter.MessageFormatter;
import lombok.RequiredArgsConstructor;
import org.jspecify.annotations.Nullable;
Expand All @@ -38,7 +35,6 @@
@RequiredArgsConstructor
public class HeaderDelimitedMessageHandler implements MessageHandler {
private static final Pattern CONTENT_LENGTH = Pattern.compile("Content-Length: (\\d+)");
private static final JsonFactory JSON_FACTORY = new JsonFactory();

private final InputStream inputStream;
private final OutputStream outputStream;
Expand Down Expand Up @@ -75,19 +71,20 @@ public JsonRpcMessage receive(MessageFormatter formatter) throws IOException {
String contentLength = readLineFromInputStream();
Matcher contentLengthMatcher = CONTENT_LENGTH.matcher(contentLength);
if (!contentLengthMatcher.matches()) {
return JsonRpcError.invalidRequest(null,
"Expected Content-Length header but received '" + contentLength + "'");
throw new JsonRpcReceiveException(null, JsonRpcReceiveException.invalidRequestDetail(
"Expected Content-Length header but received '" + contentLength + "'"));
}

String contentType = readLineFromInputStream();
if (!contentType.isEmpty()) {
if (!contentType.startsWith("Content-Type")) {
return JsonRpcError.invalidRequest(null,
"Expected Content-Type header but received '" + contentType + "'");
throw new JsonRpcReceiveException(null, JsonRpcReceiveException.invalidRequestDetail(
"Expected Content-Type header but received '" + contentType + "'"));
}
// now the next line should be an empty line
if (!readLineFromInputStream().isEmpty()) {
return JsonRpcError.invalidRequest(null, "Expected empty line after headers");
throw new JsonRpcReceiveException(null,
JsonRpcReceiveException.invalidRequestDetail("Expected empty line after headers"));
}
}

Expand All @@ -106,47 +103,19 @@ public JsonRpcMessage receive(MessageFormatter formatter) throws IOException {

ByteArrayInputStream bis = new ByteArrayInputStream(content);
return effectiveFormatter.deserialize(bis);
} catch (EOFException e) {
} catch (EOFException | JsonRpcReceiveException e) {
throw e;
} catch (IOException e) {
return JsonRpcError.invalidRequest(extractId(content), e.getMessage());
// Frame- or parse-level failure on an inbound message. Surface it
// as JsonRpcReceiveException so JsonRpc.bind() routes it back to
// the peer rather than completing an unrelated open client future
// (whose id might collide with the extracted id, or trigger the
// null-id "fail all open requests" branch).
throw new JsonRpcReceiveException(IdExtractor.extractId(content),
JsonRpcReceiveException.invalidRequestDetail(e.getMessage()));
}
}

/**
* Try to extract the JSON-RPC "id" field from raw message bytes when full
* deserialization has failed. Uses a streaming parser to read only the
* top-level "id" field without parsing the rest of the message.
*/
private static @Nullable Object extractId(byte @Nullable [] content) {
if (content == null) {
return null;
}
try (JsonParser parser = JSON_FACTORY.createParser(content)) {
if (parser.nextToken() != JsonToken.START_OBJECT) {
return null;
}
while (parser.nextToken() != JsonToken.END_OBJECT) {
String field = parser.currentName();
parser.nextToken();
if ("id".equals(field)) {
switch (parser.currentToken()) {
case VALUE_NUMBER_INT:
return parser.getIntValue();
case VALUE_STRING:
return parser.getText();
default:
return null;
}
}
// Skip over values of other top-level fields without parsing them
parser.skipChildren();
}
} catch (IOException ignored) {
}
return null;
}

private String readLineFromInputStream() throws IOException {
StringBuilder sb = new StringBuilder();
int c = inputStream.read();
Expand Down
67 changes: 67 additions & 0 deletions src/main/java/io/moderne/jsonrpc/handler/IdExtractor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2025 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* https://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.moderne.jsonrpc.handler;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.jspecify.annotations.Nullable;

import java.io.IOException;

/**
* Recovery utility for handlers: try to extract the JSON-RPC "id" field from
* raw message bytes when full deserialization has failed, so the error response
* we send back to the peer can correlate with their original request.
* <p>
* Uses a streaming parser to read only the top-level "id" field without parsing
* the rest of the (possibly malformed) message.
*/
final class IdExtractor {
private static final JsonFactory JSON_FACTORY = new JsonFactory();

private IdExtractor() {
}

static @Nullable Object extractId(byte @Nullable [] content) {
if (content == null) {
return null;
}
try (JsonParser parser = JSON_FACTORY.createParser(content)) {
if (parser.nextToken() != JsonToken.START_OBJECT) {
return null;
}
while (parser.nextToken() != JsonToken.END_OBJECT) {
String field = parser.currentName();
parser.nextToken();
if ("id".equals(field)) {
switch (parser.currentToken()) {
case VALUE_NUMBER_INT:
return parser.getIntValue();
case VALUE_STRING:
return parser.getText();
default:
return null;
}
}
// Skip over values of other top-level fields without parsing them
parser.skipChildren();
}
} catch (IOException ignored) {
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.moderne.jsonrpc.handler;

import io.moderne.jsonrpc.JsonRpcMessage;
import io.moderne.jsonrpc.JsonRpcReceiveException;
import io.moderne.jsonrpc.formatter.MessageFormatter;
import lombok.RequiredArgsConstructor;

Expand All @@ -35,15 +36,36 @@ public JsonRpcMessage receive(MessageFormatter formatter) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int b = inputStream.read();
if (b == -1) {
// Stream closed cleanly between messages — let the reader loop
// shut down rather than spin treating EOF as a parse failure.
throw new EOFException("Stream closed");
}
boolean foundNewline = false;
do {
buffer.write(b);
if (b == '\n') {
foundNewline = true;
break;
}
} while ((b = inputStream.read()) != -1);
return formatter.deserialize(new ByteArrayInputStream(buffer.toByteArray()));
if (!foundNewline) {
// Bytes read but no terminating newline before EOF — peer died
// mid-message. Match HeaderDelimitedMessageHandler's mid-message
// EOF behavior so the reader loop exits instead of treating
// partial bytes as a recoverable parse failure.
throw new EOFException("Stream closed mid-message after " + buffer.size() + " bytes");
}
byte[] content = buffer.toByteArray();
try {
return formatter.deserialize(new ByteArrayInputStream(content));
} catch (IOException e) {
// Parse failure on a complete frame. Surface as JsonRpcReceiveException
// so JsonRpc.bind() routes the error back to the peer rather than
// letting it fall through to the generic Throwable catch (which
// would lose the extracted id and the proper Invalid Request code).
throw new JsonRpcReceiveException(IdExtractor.extractId(content),
JsonRpcReceiveException.invalidRequestDetail(e.getMessage()));
}
}

@Override
Expand Down
50 changes: 50 additions & 0 deletions src/test/java/io/moderne/jsonrpc/JsonRpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -164,6 +165,55 @@ void readerLoopExitsCleanlyOnEof() throws Exception {
}
}

@Test
void malformedInboundDoesNotCompleteOpenClientRequests() throws Exception {
// Pre-fix bug: HeaderDelimitedMessageHandler returned JsonRpcError for
// frame/parse failures. JsonRpc.bind treated every JsonRpcError as a
// peer response. A malformed inbound message with no extractable id
// hit the "Error with no id — fail all open requests" branch and
// completed every open client future exceptionally — even though the
// peer never sent us anything correlated to those requests.
//
// Post-fix: handler throws JsonRpcReceiveException; JsonRpc.bind
// routes that to the peer as an error response and leaves
// openRequests untouched.
PipedOutputStream peerToServer = new PipedOutputStream();
PipedInputStream serverIn = new PipedInputStream(peerToServer);
ByteArrayOutputStream serverOut = new ByteArrayOutputStream();
JsonMessageFormatter formatter = new JsonMessageFormatter();
JsonRpc localRpc = new JsonRpc(
new HeaderDelimitedMessageHandler(serverIn, serverOut),
formatter).bind();
try {
CompletableFuture<JsonRpcSuccess> open =
localRpc.send(JsonRpcRequest.newRequest("waiting"));

// 5 ASCII bytes that are not valid JSON. extractId() returns null.
byte[] body = "hello".getBytes(StandardCharsets.UTF_8);
byte[] frame = ("Content-Length: " + body.length + "\r\n\r\n")
.getBytes(StandardCharsets.UTF_8);
peerToServer.write(frame);
peerToServer.write(body);
peerToServer.flush();

// Wait for the error response to land in serverOut.
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
while (System.nanoTime() < deadline &&
!serverOut.toString(StandardCharsets.UTF_8).contains("Invalid Request")) {
Thread.sleep(20);
}

assertThat(serverOut.toString(StandardCharsets.UTF_8))
.as("error response sent back to peer for malformed inbound")
.contains("Invalid Request");
assertThat(open.isDone())
.as("open client request stays pending — malformed inbound is not a response to it")
.isFalse();
} finally {
localRpc.shutdown();
}
}

record Person(String name) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
*/
package io.moderne.jsonrpc.handler;

import io.moderne.jsonrpc.JsonRpcError;
import io.moderne.jsonrpc.JsonRpcMessage;
import io.moderne.jsonrpc.JsonRpcReceiveException;
import io.moderne.jsonrpc.formatter.JsonMessageFormatter;
import org.junit.jupiter.api.Test;

Expand All @@ -25,7 +24,6 @@
import java.io.EOFException;
import java.io.InputStream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class HeaderDelimitedMessageHandlerTest {
Expand All @@ -41,17 +39,17 @@ void receiveThrowsEofWhenStreamClosedBetweenMessages() {
}

@Test
void receiveStillReturnsInvalidRequestForNonEmptyMalformedHeader() throws Exception {
// A non-RPC line on the wire should still produce a recoverable
// invalidRequest error (not EOF) so callers can decide what to do.
void receiveThrowsForNonEmptyMalformedHeader() {
// A non-RPC line on the wire surfaces as JsonRpcReceiveException
// (not EOF, not a returned JsonRpcError that JsonRpc.bind would
// mistake for a peer response). The caller — JsonRpc.bind — turns
// it into an error response sent back to the peer.
InputStream noise = new ByteArrayInputStream("warning: something\n".getBytes());
HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(noise, new ByteArrayOutputStream());

JsonRpcMessage msg = handler.receive(FORMATTER);

assertThat(msg).isInstanceOf(JsonRpcError.class);
assertThat(((JsonRpcError) msg).getError().getMessage())
.contains("Expected Content-Length header");
assertThatThrownBy(() -> handler.receive(FORMATTER))
.isInstanceOf(JsonRpcReceiveException.class)
.hasMessageContaining("Expected Content-Length header");
}

@Test
Expand Down
Loading
Loading