diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpc.java b/src/main/java/io/moderne/jsonrpc/JsonRpc.java index 220c425..a102057 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpc.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpc.java @@ -130,6 +130,18 @@ protected void compute() { } openRequests.clear(); shutdown = true; + } catch (JsonRpcReceiveException e) { + // Frame- or parse-level failure on an inbound message. + // Send the error back to the peer; do NOT touch + // openRequests — those track responses we're waiting + // for from the peer, and the peer's malformed message + // is not one of them. Treating it as one would either + // complete an unrelated future on id collision, or + // (worse, on null id) fail every open request at once. + JsonRpcError errorToPeer = e.toError(); + ForkJoinTask.adapt(() -> + messageHandler.send(errorToPeer, formatter) + ).fork(); } catch (Throwable t) { // Fork error sends off the reader thread to avoid // deadlock with synchronized send() diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpcReceiveException.java b/src/main/java/io/moderne/jsonrpc/JsonRpcReceiveException.java new file mode 100644 index 0000000..4a336c9 --- /dev/null +++ b/src/main/java/io/moderne/jsonrpc/JsonRpcReceiveException.java @@ -0,0 +1,47 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc; + +import org.jspecify.annotations.Nullable; + +import java.io.IOException; + +/** + * Thrown by a {@link io.moderne.jsonrpc.handler.MessageHandler} when an inbound + * message cannot be parsed or framed. Distinct from a {@link JsonRpcError} + * returned over the wire by a peer: this represents a local frame-level failure + * that should be reported back to the peer as an error response, not dispatched + * through the open-request correlation map. Mixing the two paths can complete + * an unrelated open client future on a malformed inbound request. + */ +public class JsonRpcReceiveException extends IOException { + private final @Nullable Object id; + private final JsonRpcError.Detail detail; + + public JsonRpcReceiveException(@Nullable Object id, JsonRpcError.Detail detail) { + super(detail.getMessage()); + this.id = id; + this.detail = detail; + } + + public JsonRpcError toError() { + return new JsonRpcError(id, detail); + } + + public static JsonRpcError.Detail invalidRequestDetail(@Nullable String message) { + return new JsonRpcError.Detail(-32600, "Invalid Request: " + message, null); + } +} diff --git a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java index 1900248..b44575b 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java @@ -15,11 +15,8 @@ */ package io.moderne.jsonrpc.handler; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import io.moderne.jsonrpc.JsonRpcError; import io.moderne.jsonrpc.JsonRpcMessage; +import io.moderne.jsonrpc.JsonRpcReceiveException; import io.moderne.jsonrpc.formatter.MessageFormatter; import lombok.RequiredArgsConstructor; import org.jspecify.annotations.Nullable; @@ -38,7 +35,6 @@ @RequiredArgsConstructor public class HeaderDelimitedMessageHandler implements MessageHandler { private static final Pattern CONTENT_LENGTH = Pattern.compile("Content-Length: (\\d+)"); - private static final JsonFactory JSON_FACTORY = new JsonFactory(); private final InputStream inputStream; private final OutputStream outputStream; @@ -75,19 +71,20 @@ public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { String contentLength = readLineFromInputStream(); Matcher contentLengthMatcher = CONTENT_LENGTH.matcher(contentLength); if (!contentLengthMatcher.matches()) { - return JsonRpcError.invalidRequest(null, - "Expected Content-Length header but received '" + contentLength + "'"); + throw new JsonRpcReceiveException(null, JsonRpcReceiveException.invalidRequestDetail( + "Expected Content-Length header but received '" + contentLength + "'")); } String contentType = readLineFromInputStream(); if (!contentType.isEmpty()) { if (!contentType.startsWith("Content-Type")) { - return JsonRpcError.invalidRequest(null, - "Expected Content-Type header but received '" + contentType + "'"); + throw new JsonRpcReceiveException(null, JsonRpcReceiveException.invalidRequestDetail( + "Expected Content-Type header but received '" + contentType + "'")); } // now the next line should be an empty line if (!readLineFromInputStream().isEmpty()) { - return JsonRpcError.invalidRequest(null, "Expected empty line after headers"); + throw new JsonRpcReceiveException(null, + JsonRpcReceiveException.invalidRequestDetail("Expected empty line after headers")); } } @@ -106,47 +103,19 @@ public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { ByteArrayInputStream bis = new ByteArrayInputStream(content); return effectiveFormatter.deserialize(bis); - } catch (EOFException e) { + } catch (EOFException | JsonRpcReceiveException e) { throw e; } catch (IOException e) { - return JsonRpcError.invalidRequest(extractId(content), e.getMessage()); + // Frame- or parse-level failure on an inbound message. Surface it + // as JsonRpcReceiveException so JsonRpc.bind() routes it back to + // the peer rather than completing an unrelated open client future + // (whose id might collide with the extracted id, or trigger the + // null-id "fail all open requests" branch). + throw new JsonRpcReceiveException(IdExtractor.extractId(content), + JsonRpcReceiveException.invalidRequestDetail(e.getMessage())); } } - /** - * Try to extract the JSON-RPC "id" field from raw message bytes when full - * deserialization has failed. Uses a streaming parser to read only the - * top-level "id" field without parsing the rest of the message. - */ - private static @Nullable Object extractId(byte @Nullable [] content) { - if (content == null) { - return null; - } - try (JsonParser parser = JSON_FACTORY.createParser(content)) { - if (parser.nextToken() != JsonToken.START_OBJECT) { - return null; - } - while (parser.nextToken() != JsonToken.END_OBJECT) { - String field = parser.currentName(); - parser.nextToken(); - if ("id".equals(field)) { - switch (parser.currentToken()) { - case VALUE_NUMBER_INT: - return parser.getIntValue(); - case VALUE_STRING: - return parser.getText(); - default: - return null; - } - } - // Skip over values of other top-level fields without parsing them - parser.skipChildren(); - } - } catch (IOException ignored) { - } - return null; - } - private String readLineFromInputStream() throws IOException { StringBuilder sb = new StringBuilder(); int c = inputStream.read(); diff --git a/src/main/java/io/moderne/jsonrpc/handler/IdExtractor.java b/src/main/java/io/moderne/jsonrpc/handler/IdExtractor.java new file mode 100644 index 0000000..9bfcd4b --- /dev/null +++ b/src/main/java/io/moderne/jsonrpc/handler/IdExtractor.java @@ -0,0 +1,67 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc.handler; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import org.jspecify.annotations.Nullable; + +import java.io.IOException; + +/** + * Recovery utility for handlers: try to extract the JSON-RPC "id" field from + * raw message bytes when full deserialization has failed, so the error response + * we send back to the peer can correlate with their original request. + *

+ * Uses a streaming parser to read only the top-level "id" field without parsing + * the rest of the (possibly malformed) message. + */ +final class IdExtractor { + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + + private IdExtractor() { + } + + static @Nullable Object extractId(byte @Nullable [] content) { + if (content == null) { + return null; + } + try (JsonParser parser = JSON_FACTORY.createParser(content)) { + if (parser.nextToken() != JsonToken.START_OBJECT) { + return null; + } + while (parser.nextToken() != JsonToken.END_OBJECT) { + String field = parser.currentName(); + parser.nextToken(); + if ("id".equals(field)) { + switch (parser.currentToken()) { + case VALUE_NUMBER_INT: + return parser.getIntValue(); + case VALUE_STRING: + return parser.getText(); + default: + return null; + } + } + // Skip over values of other top-level fields without parsing them + parser.skipChildren(); + } + } catch (IOException ignored) { + } + return null; + } +} diff --git a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java index e422391..0d4a311 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java @@ -16,6 +16,7 @@ package io.moderne.jsonrpc.handler; import io.moderne.jsonrpc.JsonRpcMessage; +import io.moderne.jsonrpc.JsonRpcReceiveException; import io.moderne.jsonrpc.formatter.MessageFormatter; import lombok.RequiredArgsConstructor; @@ -35,15 +36,36 @@ public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); int b = inputStream.read(); if (b == -1) { + // Stream closed cleanly between messages — let the reader loop + // shut down rather than spin treating EOF as a parse failure. throw new EOFException("Stream closed"); } + boolean foundNewline = false; do { buffer.write(b); if (b == '\n') { + foundNewline = true; break; } } while ((b = inputStream.read()) != -1); - return formatter.deserialize(new ByteArrayInputStream(buffer.toByteArray())); + if (!foundNewline) { + // Bytes read but no terminating newline before EOF — peer died + // mid-message. Match HeaderDelimitedMessageHandler's mid-message + // EOF behavior so the reader loop exits instead of treating + // partial bytes as a recoverable parse failure. + throw new EOFException("Stream closed mid-message after " + buffer.size() + " bytes"); + } + byte[] content = buffer.toByteArray(); + try { + return formatter.deserialize(new ByteArrayInputStream(content)); + } catch (IOException e) { + // Parse failure on a complete frame. Surface as JsonRpcReceiveException + // so JsonRpc.bind() routes the error back to the peer rather than + // letting it fall through to the generic Throwable catch (which + // would lose the extracted id and the proper Invalid Request code). + throw new JsonRpcReceiveException(IdExtractor.extractId(content), + JsonRpcReceiveException.invalidRequestDetail(e.getMessage())); + } } @Override diff --git a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java index 122441b..6bd024e 100644 --- a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java +++ b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -164,6 +165,55 @@ void readerLoopExitsCleanlyOnEof() throws Exception { } } + @Test + void malformedInboundDoesNotCompleteOpenClientRequests() throws Exception { + // Pre-fix bug: HeaderDelimitedMessageHandler returned JsonRpcError for + // frame/parse failures. JsonRpc.bind treated every JsonRpcError as a + // peer response. A malformed inbound message with no extractable id + // hit the "Error with no id — fail all open requests" branch and + // completed every open client future exceptionally — even though the + // peer never sent us anything correlated to those requests. + // + // Post-fix: handler throws JsonRpcReceiveException; JsonRpc.bind + // routes that to the peer as an error response and leaves + // openRequests untouched. + PipedOutputStream peerToServer = new PipedOutputStream(); + PipedInputStream serverIn = new PipedInputStream(peerToServer); + ByteArrayOutputStream serverOut = new ByteArrayOutputStream(); + JsonMessageFormatter formatter = new JsonMessageFormatter(); + JsonRpc localRpc = new JsonRpc( + new HeaderDelimitedMessageHandler(serverIn, serverOut), + formatter).bind(); + try { + CompletableFuture open = + localRpc.send(JsonRpcRequest.newRequest("waiting")); + + // 5 ASCII bytes that are not valid JSON. extractId() returns null. + byte[] body = "hello".getBytes(StandardCharsets.UTF_8); + byte[] frame = ("Content-Length: " + body.length + "\r\n\r\n") + .getBytes(StandardCharsets.UTF_8); + peerToServer.write(frame); + peerToServer.write(body); + peerToServer.flush(); + + // Wait for the error response to land in serverOut. + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < deadline && + !serverOut.toString(StandardCharsets.UTF_8).contains("Invalid Request")) { + Thread.sleep(20); + } + + assertThat(serverOut.toString(StandardCharsets.UTF_8)) + .as("error response sent back to peer for malformed inbound") + .contains("Invalid Request"); + assertThat(open.isDone()) + .as("open client request stays pending — malformed inbound is not a response to it") + .isFalse(); + } finally { + localRpc.shutdown(); + } + } + record Person(String name) { } diff --git a/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java b/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java index e312e6f..0cdff1b 100644 --- a/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java +++ b/src/test/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandlerTest.java @@ -15,8 +15,7 @@ */ package io.moderne.jsonrpc.handler; -import io.moderne.jsonrpc.JsonRpcError; -import io.moderne.jsonrpc.JsonRpcMessage; +import io.moderne.jsonrpc.JsonRpcReceiveException; import io.moderne.jsonrpc.formatter.JsonMessageFormatter; import org.junit.jupiter.api.Test; @@ -25,7 +24,6 @@ import java.io.EOFException; import java.io.InputStream; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; class HeaderDelimitedMessageHandlerTest { @@ -41,17 +39,17 @@ void receiveThrowsEofWhenStreamClosedBetweenMessages() { } @Test - void receiveStillReturnsInvalidRequestForNonEmptyMalformedHeader() throws Exception { - // A non-RPC line on the wire should still produce a recoverable - // invalidRequest error (not EOF) so callers can decide what to do. + void receiveThrowsForNonEmptyMalformedHeader() { + // A non-RPC line on the wire surfaces as JsonRpcReceiveException + // (not EOF, not a returned JsonRpcError that JsonRpc.bind would + // mistake for a peer response). The caller — JsonRpc.bind — turns + // it into an error response sent back to the peer. InputStream noise = new ByteArrayInputStream("warning: something\n".getBytes()); HeaderDelimitedMessageHandler handler = new HeaderDelimitedMessageHandler(noise, new ByteArrayOutputStream()); - JsonRpcMessage msg = handler.receive(FORMATTER); - - assertThat(msg).isInstanceOf(JsonRpcError.class); - assertThat(((JsonRpcError) msg).getError().getMessage()) - .contains("Expected Content-Length header"); + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(JsonRpcReceiveException.class) + .hasMessageContaining("Expected Content-Length header"); } @Test diff --git a/src/test/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandlerTest.java b/src/test/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandlerTest.java new file mode 100644 index 0000000..c806869 --- /dev/null +++ b/src/test/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandlerTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc.handler; + +import io.moderne.jsonrpc.JsonRpcReceiveException; +import io.moderne.jsonrpc.JsonRpcRequest; +import io.moderne.jsonrpc.formatter.JsonMessageFormatter; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class NewLineDelimitedMessageHandlerTest { + private static final JsonMessageFormatter FORMATTER = new JsonMessageFormatter(); + + @Test + void receiveDeserializesCompleteFrame() throws Exception { + InputStream in = new ByteArrayInputStream( + "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"ping\"}\n".getBytes(StandardCharsets.UTF_8)); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(in, new ByteArrayOutputStream()); + + JsonRpcRequest msg = (JsonRpcRequest) handler.receive(FORMATTER); + + assertThat(msg.getMethod()).isEqualTo("ping"); + assertThat(msg.getId()).isEqualTo(1); + } + + @Test + void receiveThrowsEofWhenStreamClosedBetweenMessages() { + InputStream empty = new ByteArrayInputStream(new byte[0]); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(empty, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(EOFException.class) + .hasMessageContaining("Stream closed"); + } + + @Test + void receiveThrowsEofWhenStreamClosesMidMessage() { + // Bytes received, no terminating newline, then EOF — peer died. + // Must match HeaderDelimitedMessageHandler's mid-message EOF so the + // reader loop shuts down instead of trying to parse partial bytes. + InputStream truncated = new ByteArrayInputStream("{\"partial".getBytes(StandardCharsets.UTF_8)); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(truncated, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(EOFException.class) + .hasMessageContaining("mid-message"); + } + + @Test + void receiveThrowsForMalformedJsonInCompleteFrame() { + // A complete frame (newline-terminated) whose contents fail to parse + // surfaces as JsonRpcReceiveException — NOT as a returned JsonRpcError + // that JsonRpc.bind would mistake for a peer response. + InputStream noise = new ByteArrayInputStream("hello world\n".getBytes(StandardCharsets.UTF_8)); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(noise, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOf(JsonRpcReceiveException.class) + .hasMessageContaining("Invalid Request"); + } + + @Test + void receiveExtractsIdFromMalformedFrameWherePossible() { + // Truncated JSON object that has a parseable id field. The recovery + // path should pick up the id so the error response we send back can + // correlate with the peer's original request. + InputStream framed = new ByteArrayInputStream( + "{\"id\":42,\"method\":\"oops\",\"params\":{\"unterminated\n".getBytes(StandardCharsets.UTF_8)); + NewLineDelimitedMessageHandler handler = new NewLineDelimitedMessageHandler(framed, new ByteArrayOutputStream()); + + assertThatThrownBy(() -> handler.receive(FORMATTER)) + .isInstanceOfSatisfying(JsonRpcReceiveException.class, e -> { + assertThat(e.toError().getId()).isEqualTo(42); + }); + } +}