Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/main/java/io/moderne/jsonrpc/JsonRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ public <P> JsonRpc rpc(String name, JsonRpcMethod<P> method) {
public CompletableFuture<JsonRpcSuccess> send(JsonRpcRequest request) {
CompletableFuture<JsonRpcSuccess> response = new CompletableFuture<>();
openRequests.put(request.getId(), response);
if (shutdown) {
// Reader loop already exited (peer EOF or explicit shutdown) and
// may have drained openRequests before our put. Fail the future
// ourselves; completeExceptionally is a no-op if the reader did
// observe our entry and failed it first.
openRequests.remove(request.getId());
response.completeExceptionally(new JsonRpcException(
JsonRpcError.internalError(null, "JSON-RPC peer closed the stream")));
return response;
}
messageHandler.send(request, formatter);
return response;
}
Expand Down Expand Up @@ -113,15 +123,16 @@ protected void compute() {
}
} catch (EOFException e) {
// Peer closed the stream — there's nothing more to read.
// Fail any in-flight requests once and exit the loop
// instead of spinning on the closed pipe.
// Set shutdown FIRST so a concurrent send() observes it
// and fails its own future after put; otherwise a request
// registered after this drain would be stranded.
shutdown = true;
JsonRpcException eof = new JsonRpcException(
JsonRpcError.internalError(null, "JSON-RPC peer closed the stream"));
for (CompletableFuture<JsonRpcSuccess> future : openRequests.values()) {
future.completeExceptionally(eof);
}
openRequests.clear();
shutdown = true;
} catch (JsonRpcReceiveException e) {
// Frame- or parse-level failure on an inbound message.
// Send the error back to the peer; do NOT touch
Expand Down
Loading