From 56a06d202454651fbada0ffb3004bc25a32bc724 Mon Sep 17 00:00:00 2001 From: Jonathan Schneider Date: Wed, 6 May 2026 19:52:29 -0400 Subject: [PATCH] Route handler-level parse errors back to peer instead of completing client futures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit HeaderDelimitedMessageHandler.receive() returned JsonRpcError.invalidRequest(...) for frame and parse failures on inbound messages. JsonRpc.bind() then treated every JsonRpcError as a peer response and dispatched it through openRequests based on id. With PR #14's extractId() recovering an id from malformed bytes where possible, this caused two distinct silent failure modes: - if the extracted id collided with an open client request, that unrelated client future was completed exceptionally with the peer's malformed message treated as a wire error response; - if the extracted id was null (the more common case), the "Error with no id" branch failed every open client future at once. In neither case was the actual error sent back to the peer. Introduce JsonRpcReceiveException carrying the recovered id and an JsonRpcError.Detail. Both HeaderDelimitedMessageHandler and NewLineDelimitedMessageHandler now throw it for frame and parse failures. JsonRpc.bind() catches it and forks the error response back to the peer; openRequests is left untouched. NewLineDelimitedMessageHandler also now distinguishes mid-message EOF (peer died with partial bytes — throws EOFException, matches Header) from a parse failure on a complete frame (throws JsonRpcReceiveException with id extraction where possible). Extracts the previously private extractId logic into IdExtractor so both handlers share it. --- src/main/java/io/moderne/jsonrpc/JsonRpc.java | 12 +++ .../jsonrpc/JsonRpcReceiveException.java | 47 +++++++++ .../HeaderDelimitedMessageHandler.java | 61 +++--------- .../moderne/jsonrpc/handler/IdExtractor.java | 67 +++++++++++++ .../NewLineDelimitedMessageHandler.java | 24 ++++- .../java/io/moderne/jsonrpc/JsonRpcTest.java | 50 ++++++++++ .../HeaderDelimitedMessageHandlerTest.java | 20 ++-- .../NewLineDelimitedMessageHandlerTest.java | 97 +++++++++++++++++++ 8 files changed, 320 insertions(+), 58 deletions(-) create mode 100644 src/main/java/io/moderne/jsonrpc/JsonRpcReceiveException.java create mode 100644 src/main/java/io/moderne/jsonrpc/handler/IdExtractor.java create mode 100644 src/test/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandlerTest.java diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpc.java b/src/main/java/io/moderne/jsonrpc/JsonRpc.java index 220c425..a102057 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpc.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpc.java @@ -130,6 +130,18 @@ protected void compute() { } openRequests.clear(); shutdown = true; + } catch (JsonRpcReceiveException e) { + // Frame- or parse-level failure on an inbound message. + // Send the error back to the peer; do NOT touch + // openRequests — those track responses we're waiting + // for from the peer, and the peer's malformed message + // is not one of them. Treating it as one would either + // complete an unrelated future on id collision, or + // (worse, on null id) fail every open request at once. + JsonRpcError errorToPeer = e.toError(); + ForkJoinTask.adapt(() -> + messageHandler.send(errorToPeer, formatter) + ).fork(); } catch (Throwable t) { // Fork error sends off the reader thread to avoid // deadlock with synchronized send() diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpcReceiveException.java b/src/main/java/io/moderne/jsonrpc/JsonRpcReceiveException.java new file mode 100644 index 0000000..4a336c9 --- /dev/null +++ b/src/main/java/io/moderne/jsonrpc/JsonRpcReceiveException.java @@ -0,0 +1,47 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc; + +import org.jspecify.annotations.Nullable; + +import java.io.IOException; + +/** + * Thrown by a {@link io.moderne.jsonrpc.handler.MessageHandler} when an inbound + * message cannot be parsed or framed. Distinct from a {@link JsonRpcError} + * returned over the wire by a peer: this represents a local frame-level failure + * that should be reported back to the peer as an error response, not dispatched + * through the open-request correlation map. Mixing the two paths can complete + * an unrelated open client future on a malformed inbound request. + */ +public class JsonRpcReceiveException extends IOException { + private final @Nullable Object id; + private final JsonRpcError.Detail detail; + + public JsonRpcReceiveException(@Nullable Object id, JsonRpcError.Detail detail) { + super(detail.getMessage()); + this.id = id; + this.detail = detail; + } + + public JsonRpcError toError() { + return new JsonRpcError(id, detail); + } + + public static JsonRpcError.Detail invalidRequestDetail(@Nullable String message) { + return new JsonRpcError.Detail(-32600, "Invalid Request: " + message, null); + } +} diff --git a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java index 1900248..b44575b 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java @@ -15,11 +15,8 @@ */ package io.moderne.jsonrpc.handler; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import io.moderne.jsonrpc.JsonRpcError; import io.moderne.jsonrpc.JsonRpcMessage; +import io.moderne.jsonrpc.JsonRpcReceiveException; import io.moderne.jsonrpc.formatter.MessageFormatter; import lombok.RequiredArgsConstructor; import org.jspecify.annotations.Nullable; @@ -38,7 +35,6 @@ @RequiredArgsConstructor public class HeaderDelimitedMessageHandler implements MessageHandler { private static final Pattern CONTENT_LENGTH = Pattern.compile("Content-Length: (\\d+)"); - private static final JsonFactory JSON_FACTORY = new JsonFactory(); private final InputStream inputStream; private final OutputStream outputStream; @@ -75,19 +71,20 @@ public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { String contentLength = readLineFromInputStream(); Matcher contentLengthMatcher = CONTENT_LENGTH.matcher(contentLength); if (!contentLengthMatcher.matches()) { - return JsonRpcError.invalidRequest(null, - "Expected Content-Length header but received '" + contentLength + "'"); + throw new JsonRpcReceiveException(null, JsonRpcReceiveException.invalidRequestDetail( + "Expected Content-Length header but received '" + contentLength + "'")); } String contentType = readLineFromInputStream(); if (!contentType.isEmpty()) { if (!contentType.startsWith("Content-Type")) { - return JsonRpcError.invalidRequest(null, - "Expected Content-Type header but received '" + contentType + "'"); + throw new JsonRpcReceiveException(null, JsonRpcReceiveException.invalidRequestDetail( + "Expected Content-Type header but received '" + contentType + "'")); } // now the next line should be an empty line if (!readLineFromInputStream().isEmpty()) { - return JsonRpcError.invalidRequest(null, "Expected empty line after headers"); + throw new JsonRpcReceiveException(null, + JsonRpcReceiveException.invalidRequestDetail("Expected empty line after headers")); } } @@ -106,47 +103,19 @@ public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { ByteArrayInputStream bis = new ByteArrayInputStream(content); return effectiveFormatter.deserialize(bis); - } catch (EOFException e) { + } catch (EOFException | JsonRpcReceiveException e) { throw e; } catch (IOException e) { - return JsonRpcError.invalidRequest(extractId(content), e.getMessage()); + // Frame- or parse-level failure on an inbound message. Surface it + // as JsonRpcReceiveException so JsonRpc.bind() routes it back to + // the peer rather than completing an unrelated open client future + // (whose id might collide with the extracted id, or trigger the + // null-id "fail all open requests" branch). + throw new JsonRpcReceiveException(IdExtractor.extractId(content), + JsonRpcReceiveException.invalidRequestDetail(e.getMessage())); } } - /** - * Try to extract the JSON-RPC "id" field from raw message bytes when full - * deserialization has failed. Uses a streaming parser to read only the - * top-level "id" field without parsing the rest of the message. - */ - private static @Nullable Object extractId(byte @Nullable [] content) { - if (content == null) { - return null; - } - try (JsonParser parser = JSON_FACTORY.createParser(content)) { - if (parser.nextToken() != JsonToken.START_OBJECT) { - return null; - } - while (parser.nextToken() != JsonToken.END_OBJECT) { - String field = parser.currentName(); - parser.nextToken(); - if ("id".equals(field)) { - switch (parser.currentToken()) { - case VALUE_NUMBER_INT: - return parser.getIntValue(); - case VALUE_STRING: - return parser.getText(); - default: - return null; - } - } - // Skip over values of other top-level fields without parsing them - parser.skipChildren(); - } - } catch (IOException ignored) { - } - return null; - } - private String readLineFromInputStream() throws IOException { StringBuilder sb = new StringBuilder(); int c = inputStream.read(); diff --git a/src/main/java/io/moderne/jsonrpc/handler/IdExtractor.java b/src/main/java/io/moderne/jsonrpc/handler/IdExtractor.java new file mode 100644 index 0000000..9bfcd4b --- /dev/null +++ b/src/main/java/io/moderne/jsonrpc/handler/IdExtractor.java @@ -0,0 +1,67 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc.handler; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import org.jspecify.annotations.Nullable; + +import java.io.IOException; + +/** + * Recovery utility for handlers: try to extract the JSON-RPC "id" field from + * raw message bytes when full deserialization has failed, so the error response + * we send back to the peer can correlate with their original request. + *

+ * Uses a streaming parser to read only the top-level "id" field without parsing + * the rest of the (possibly malformed) message. + */ +final class IdExtractor { + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + + private IdExtractor() { + } + + static @Nullable Object extractId(byte @Nullable [] content) { + if (content == null) { + return null; + } + try (JsonParser parser = JSON_FACTORY.createParser(content)) { + if (parser.nextToken() != JsonToken.START_OBJECT) { + return null; + } + while (parser.nextToken() != JsonToken.END_OBJECT) { + String field = parser.currentName(); + parser.nextToken(); + if ("id".equals(field)) { + switch (parser.currentToken()) { + case VALUE_NUMBER_INT: + return parser.getIntValue(); + case VALUE_STRING: + return parser.getText(); + default: + return null; + } + } + // Skip over values of other top-level fields without parsing them + parser.skipChildren(); + } + } catch (IOException ignored) { + } + return null; + } +} diff --git a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java index e422391..0d4a311 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java @@ -16,6 +16,7 @@ package io.moderne.jsonrpc.handler; import io.moderne.jsonrpc.JsonRpcMessage; +import io.moderne.jsonrpc.JsonRpcReceiveException; import io.moderne.jsonrpc.formatter.MessageFormatter; import lombok.RequiredArgsConstructor; @@ -35,15 +36,36 @@ public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); int b = inputStream.read(); if (b == -1) { + // Stream closed cleanly between messages — let the reader loop + // shut down rather than spin treating EOF as a parse failure. throw new EOFException("Stream closed"); } + boolean foundNewline = false; do { buffer.write(b); if (b == '\n') { + foundNewline = true; break; } } while ((b = inputStream.read()) != -1); - return formatter.deserialize(new ByteArrayInputStream(buffer.toByteArray())); + if (!foundNewline) { + // Bytes read but no terminating newline before EOF — peer died + // mid-message. Match HeaderDelimitedMessageHandler's mid-message + // EOF behavior so the reader loop exits instead of treating + // partial bytes as a recoverable parse failure. + throw new EOFException("Stream closed mid-message after " + buffer.size() + " bytes"); + } + byte[] content = buffer.toByteArray(); + try { + return formatter.deserialize(new ByteArrayInputStream(content)); + } catch (IOException e) { + // Parse failure on a complete frame. Surface as JsonRpcReceiveException + // so JsonRpc.bind() routes the error back to the peer rather than + // letting it fall through to the generic Throwable catch (which + // would lose the extracted id and the proper Invalid Request code). + throw new JsonRpcReceiveException(IdExtractor.extractId(content), + JsonRpcReceiveException.invalidRequestDetail(e.getMessage())); + } } @Override diff --git a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java index 122441b..6bd024e 100644 --- a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java +++ b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -164,6 +165,55 @@ void readerLoopExitsCleanlyOnEof() throws Exception { } } + @Test + void malformedInboundDoesNotCompleteOpenClientRequests() throws Exception { + // Pre-fix bug: HeaderDelimitedMessageHandler returned JsonRpcError for + // frame/parse failures. JsonRpc.bind treated every JsonRpcError as a + // peer response. A malformed inbound message with no extractable id + // hit the "Error with no id — fail all open requests" branch and + // completed every open client future exceptionally — even though the + // peer never sent us anything correlated to those requests. + // + // Post-fix: handler throws JsonRpcReceiveException; JsonRpc.bind + // routes that to the peer as an error response and leaves + // openRequests untouched. + PipedOutputStream peerToServer = new PipedOutputStream(); + PipedInputStream serverIn = new PipedInputStream(peerToServer); + ByteArrayOutputStream serverOut = new ByteArrayOutputStream(); + JsonMessageFormatter formatter = new JsonMessageFormatter(); + JsonRpc localRpc = new JsonRpc( + new HeaderDelimitedMessageHandler(serverIn, serverOut), + formatter).bind(); + try { + CompletableFuture open = + localRpc.send(JsonRpcRequest.newRequest("waiting")); + + // 5 ASCII bytes that are not valid JSON. extractId() returns null. + byte[] body = "hello".getBytes(StandardCharsets.UTF_8); + byte[] frame = ("Content-Length: " + body.length + "\r\n\r\n") + .getBytes(StandardCharsets.UTF_8); + peerToServer.write(frame); + peerToServer.write(body); + peerToServer.flush(); + + // Wait for the error response to land in serverOut. + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < deadline && + !serverOut.toString(StandardCharsets.UTF_8).contains("Invalid Request")) { + Thread.sleep(20); + } + + assertThat(serverOut.toString(StandardCharsets.UTF_8)) + .as("error response sent back to peer for malformed inbound") + .contains("Invalid Request"); + assertThat(open.isDone()) + .as("open client request stays pending — malformed inbound is not a response to it") + .isFalse(); + } finally { + localRpc.shutdown(); + } + } + record Person(String name) { } diff --git a/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java b/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java index e312e6f..0cdff1b 100644 --- a/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java +++ b/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java @@ -15,8 +15,7 @@ */ package io.moderne.jsonrpc.handler; -import io.moderne.jsonrpc.JsonRpcError; -import io.moderne.jsonrpc.JsonRpcMessage; +import io.moderne.jsonrpc.JsonRpcReceiveException; import io.moderne.jsonrpc.formatter.JsonMessageFormatter; import org.junit.jupiter.api.Test; @@ -25,7 +24,6 @@ import java.io.EOFException; import java.io.InputStream; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; class HeaderDelimitedMessageHandlerTest { @@ -41,17 +39,17 @@ void receiveThrowsEofWhenStreamClosedBetweenMessages() { } @Test - void receiveStillReturnsInvalidRequestForNonEmptyMalformedHeader() throws Exception { - // A non-RPC line on the wire should still produce a recoverable - // invalidRequest error (not EOF) so callers can decide what to do. + void receiveThrowsForNonEmptyMalformedHeader() { + // A non-RPC line on the wire surfaces as JsonRpcReceiveException + // (not EOF, not a returned JsonRpcError that JsonRpc.bind would + // mistake for a peer response). The caller — JsonRpc.bind — turns + // it into an error response sent back to the peer. InputStream noise = new ByteArrayInputStream("warning: something\n".getBytes()); HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(noise, new ByteArrayOutputStream()); - JsonRpcMessage msg = handler.receive(FORMATTER); - - assertThat(msg).isInstanceOf(JsonRpcError.class); - assertThat(((JsonRpcError) msg).getError().getMessage()) - .contains("Expected Content-Length header"); + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(JsonRpcReceiveException.class) + .hasMessageContaining("Expected Content-Length header"); } @Test diff --git a/src/test/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandlerTest.java b/src/test/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandlerTest.java new file mode 100644 index 0000000..c806869 --- /dev/null +++ b/src/test/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandlerTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc.handler; + +import io.moderne.jsonrpc.JsonRpcReceiveException; +import io.moderne.jsonrpc.JsonRpcRequest; +import io.moderne.jsonrpc.formatter.JsonMessageFormatter; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class NewLineDelimitedMessageHandlerTest { + private static final JsonMessageFormatter FORMATTER = new JsonMessageFormatter(); + + @Test + void receiveDeserializesCompleteFrame() throws Exception { + InputStream in = new ByteArrayInputStream( + "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"ping\"}\n".getBytes(StandardCharsets.UTF_8)); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(in, new ByteArrayOutputStream()); + + JsonRpcRequest msg = (JsonRpcRequest) handler.receive(FORMATTER); + + assertThat(msg.getMethod()).isEqualTo("ping"); + assertThat(msg.getId()).isEqualTo(1); + } + + @Test + void receiveThrowsEofWhenStreamClosedBetweenMessages() { + InputStream empty = new ByteArrayInputStream(new byte[0]); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(empty, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(EOFException.class) + .hasMessageContaining("Stream closed"); + } + + @Test + void receiveThrowsEofWhenStreamClosesMidMessage() { + // Bytes received, no terminating newline, then EOF — peer died. + // Must match HeaderDelimitedMessageHandler's mid-message EOF so the + // reader loop shuts down instead of trying to parse partial bytes. + InputStream truncated = new ByteArrayInputStream("{\"partial".getBytes(StandardCharsets.UTF_8)); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(truncated, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(EOFException.class) + .hasMessageContaining("mid-message"); + } + + @Test + void receiveThrowsForMalformedJsonInCompleteFrame() { + // A complete frame (newline-terminated) whose contents fail to parse + // surfaces as JsonRpcReceiveException — NOT as a returned JsonRpcError + // that JsonRpc.bind would mistake for a peer response. + InputStream noise = new ByteArrayInputStream("hello world\n".getBytes(StandardCharsets.UTF_8)); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(noise, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(JsonRpcReceiveException.class) + .hasMessageContaining("Invalid Request"); + } + + @Test + void receiveExtractsIdFromMalformedFrameWherePossible() { + // Truncated JSON object that has a parseable id field. The recovery + // path should pick up the id so the error response we send back can + // correlate with the peer's original request. + InputStream framed = new ByteArrayInputStream( + "{\"id\":42,\"method\":\"oops\",\"params\":{\"unterminated\n".getBytes(StandardCharsets.UTF_8)); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(framed, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOfSatisfying(JsonRpcReceiveException.class, e -> { + assertThat(e.toError().getId()).isEqualTo(42); + }); + } +}