From 4314f91e503b4b5603d9a8f9570dce40b7ddaeb4 Mon Sep 17 00:00:00 2001 From: Knut Wannheden Date: Tue, 13 Jan 2026 09:27:47 +0100 Subject: [PATCH 1/2] Replace ObjectMapper with MessageFormatter abstraction Refactor JsonRpcSuccess and JsonRpcMethod to use the MessageFormatter interface instead of their own static ObjectMapper instances. This removes duplicate ObjectMapper configurations and centralizes JSON conversion through the MessageFormatter.convertValue() method. Key changes: - Add convertValue(Object, Type) method to MessageFormatter interface - JsonRpc now takes a MessageFormatter parameter (with deprecated single-arg constructor for backwards compatibility) - JsonRpcSuccess uses injected formatter via factory method - JsonRpcMethod.convertAndHandle() takes formatter parameter Co-Authored-By: Claude Opus 4.5 --- src/main/java/io/moderne/jsonrpc/JsonRpc.java | 13 +++- .../io/moderne/jsonrpc/JsonRpcMethod.java | 24 +------ .../io/moderne/jsonrpc/JsonRpcSuccess.java | 64 ++++++++++--------- .../formatter/JsonMessageFormatter.java | 29 +++++++-- .../jsonrpc/formatter/MessageFormatter.java | 6 ++ .../java/io/moderne/jsonrpc/JsonRpcTest.java | 6 +- 6 files changed, 83 insertions(+), 59 deletions(-) diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpc.java b/src/main/java/io/moderne/jsonrpc/JsonRpc.java index 033666f..7e7c37d 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpc.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpc.java @@ -15,6 +15,8 @@ */ package io.moderne.jsonrpc; +import io.moderne.jsonrpc.formatter.JsonMessageFormatter; +import io.moderne.jsonrpc.formatter.MessageFormatter; import io.moderne.jsonrpc.handler.MessageHandler; import lombok.RequiredArgsConstructor; @@ -31,8 +33,17 @@ public class JsonRpc { private volatile boolean shutdown = false; private final MessageHandler messageHandler; + private final MessageFormatter formatter; private final Map> openRequests = new ConcurrentHashMap<>(); + /** + * @deprecated Use {@link #JsonRpc(MessageHandler, MessageFormatter)} instead. + */ + @Deprecated + public JsonRpc(MessageHandler messageHandler) { + this(messageHandler, new JsonMessageFormatter()); + } + public

JsonRpc rpc(String name, JsonRpcMethod

method) { methods.put(name, method); return this; @@ -78,7 +89,7 @@ protected void compute() { } else { ForkJoinTask.adapt(() -> { try { - Object response = method.convertAndHandle(request.getParams()); + Object response = method.convertAndHandle(request.getParams(), formatter); if (response != null) { messageHandler.send(new JsonRpcSuccess(request.getId(), response)); } else { diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpcMethod.java b/src/main/java/io/moderne/jsonrpc/JsonRpcMethod.java index fe6fec0..b0daad7 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpcMethod.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpcMethod.java @@ -15,39 +15,21 @@ */ package io.moderne.jsonrpc; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.cfg.ConstructorDetector; -import com.fasterxml.jackson.databind.json.JsonMapper; -import com.fasterxml.jackson.databind.type.TypeFactory; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; +import io.moderne.jsonrpc.formatter.MessageFormatter; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; @SuppressWarnings("unused") public abstract class JsonRpcMethod

{ - private static final ObjectMapper mapper = JsonMapper.builder() - // to be able to construct classes that have @Data and a single field - // see https://cowtowncoder.medium.com/jackson-2-12-most-wanted-3-5-246624e2d3d0 - .constructorDetector(ConstructorDetector.USE_PROPERTIES_BASED) - .build() - .registerModules(new ParameterNamesModule(), new JavaTimeModule()) - .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) - .disable(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES) - .setSerializationInclusion(JsonInclude.Include.NON_NULL); - final Object convertAndHandle(Object params) throws Exception { + final Object convertAndHandle(Object params, MessageFormatter formatter) throws Exception { Type paramType = ((ParameterizedType) getClass().getGenericSuperclass()) .getActualTypeArguments()[0]; if (Void.class.equals(paramType)) { return handle(null); } - JavaType jt = TypeFactory.defaultInstance().constructType(paramType); - return handle(mapper.convertValue(params, jt)); + return handle(formatter.convertValue(params, paramType)); } protected abstract Object handle(P params) throws Exception; diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpcSuccess.java b/src/main/java/io/moderne/jsonrpc/JsonRpcSuccess.java index a4d04bf..b898f6c 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpcSuccess.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpcSuccess.java @@ -15,44 +15,46 @@ */ package io.moderne.jsonrpc; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.cfg.ConstructorDetector; -import com.fasterxml.jackson.databind.json.JsonMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; +import com.fasterxml.jackson.annotation.JsonIgnore; +import io.moderne.jsonrpc.formatter.MessageFormatter; import lombok.EqualsAndHashCode; -import lombok.Value; +import lombok.Getter; +import lombok.ToString; import org.jspecify.annotations.Nullable; -@Value @EqualsAndHashCode(callSuper = false) +@ToString public class JsonRpcSuccess extends JsonRpcResponse { - private static final ObjectMapper mapper = JsonMapper.builder() - // to be able to construct classes that have @Data and a single field - // see https://cowtowncoder.medium.com/jackson-2-12-most-wanted-3-5-246624e2d3d0 - .constructorDetector(ConstructorDetector.USE_PROPERTIES_BASED) - .build() - .registerModules(new ParameterNamesModule(), new JavaTimeModule()) - .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) - .setSerializationInclusion(JsonInclude.Include.NON_NULL); - - /** - * String or Integer - */ - @JsonDeserialize(using = JsonRpcIdDeserializer.class) - Object id; - - /** - * No need for polymorphic deserialization here, since the result type will - * always be known by the requester. - */ + + @Getter + private final Object id; + + @Getter @Nullable - Object result; + private final Object result; + + @JsonIgnore + @EqualsAndHashCode.Exclude + @ToString.Exclude + @Nullable + private final transient MessageFormatter formatter; + + public JsonRpcSuccess(Object id, @Nullable Object result) { + this(id, result, null); + } + + private JsonRpcSuccess(Object id, @Nullable Object result, @Nullable MessageFormatter formatter) { + this.id = id; + this.result = result; + this.formatter = formatter; + } + + public static JsonRpcSuccess fromPayload(Object id, @Nullable Object result, @Nullable MessageFormatter formatter) { + return new JsonRpcSuccess(id, result, formatter); + } public V getResult(Class resultType) { - return mapper.convertValue(result, resultType); + assert formatter != null; + return formatter.convertValue(result, resultType); } } diff --git a/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java b/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java index 9b1d093..fc5ba09 100644 --- a/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java +++ b/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java @@ -20,6 +20,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.cfg.ConstructorDetector; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; import io.moderne.jsonrpc.JsonRpcError; @@ -30,13 +32,18 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Type; import java.util.Map; public class JsonMessageFormatter implements MessageFormatter { private final ObjectMapper mapper; public JsonMessageFormatter() { - this(new ObjectMapper() + this(JsonMapper.builder() + // to be able to construct classes that have @Data and a single field + // see https://cowtowncoder.medium.com/jackson-2-12-most-wanted-3-5-246624e2d3d0 + .constructorDetector(ConstructorDetector.USE_PROPERTIES_BASED) + .build() .registerModules(new ParameterNamesModule(), new JavaTimeModule()) .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) .setSerializationInclusion(JsonInclude.Include.NON_NULL)); @@ -48,7 +55,11 @@ public JsonMessageFormatter() { } public JsonMessageFormatter(com.fasterxml.jackson.databind.Module... modules) { - this(new ObjectMapper() + this(JsonMapper.builder() + // to be able to construct classes that have @Data and a single field + // see https://cowtowncoder.medium.com/jackson-2-12-most-wanted-3-5-246624e2d3d0 + .constructorDetector(ConstructorDetector.USE_PROPERTIES_BASED) + .build() .registerModules(new ParameterNamesModule(), new JavaTimeModule()) .registerModules(modules) .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) @@ -66,18 +77,26 @@ public JsonMessageFormatter(ObjectMapper mapper) { @Override public JsonRpcMessage deserialize(InputStream in) throws IOException { - Map payload = mapper.readValue(in, new TypeReference>() { - }); + Map payload = mapper.readValue(in, new TypeReference>() {}); if (payload.containsKey("method")) { return mapper.convertValue(payload, JsonRpcRequest.class); } else if (payload.containsKey("error")) { return mapper.convertValue(payload, JsonRpcError.class); } - return mapper.convertValue(payload, JsonRpcSuccess.class); + Object id = payload.get("id"); + if (id instanceof Number) { + id = ((Number) id).intValue(); + } + return JsonRpcSuccess.fromPayload(id, payload.get("result"), this); } @Override public void serialize(JsonRpcMessage message, OutputStream out) throws IOException { mapper.writeValue(out, message); } + + @Override + public T convertValue(Object value, Type type) { + return mapper.convertValue(value, mapper.getTypeFactory().constructType(type)); + } } diff --git a/src/main/java/io/moderne/jsonrpc/formatter/MessageFormatter.java b/src/main/java/io/moderne/jsonrpc/formatter/MessageFormatter.java index 1322d12..1c27489 100644 --- a/src/main/java/io/moderne/jsonrpc/formatter/MessageFormatter.java +++ b/src/main/java/io/moderne/jsonrpc/formatter/MessageFormatter.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Type; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -28,6 +29,11 @@ public interface MessageFormatter { void serialize(JsonRpcMessage message, OutputStream out) throws IOException; + /** + * Converts a value (typically from JSON-RPC params or result) to the specified type. + */ + T convertValue(Object value, Type type); + default Charset getEncoding() { return StandardCharsets.UTF_8; } diff --git a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java index 52c8044..ef717ed 100644 --- a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java +++ b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java @@ -15,6 +15,7 @@ */ package io.moderne.jsonrpc; +import io.moderne.jsonrpc.formatter.JsonMessageFormatter; import io.moderne.jsonrpc.handler.HeaderDelimitedMessageHandler; import io.moderne.jsonrpc.handler.TraceMessageHandler; import org.junit.jupiter.api.AfterEach; @@ -40,7 +41,10 @@ public class JsonRpcTest { void before() throws IOException { PipedOutputStream os = new PipedOutputStream(); PipedInputStream is = new PipedInputStream(os); - jsonRpc = new JsonRpc(new TraceMessageHandler("both", new HeaderDelimitedMessageHandler(is, os))); + JsonMessageFormatter formatter = new JsonMessageFormatter(); + jsonRpc = new JsonRpc( + new TraceMessageHandler("both", new HeaderDelimitedMessageHandler(formatter, is, os)), + formatter); } @AfterEach From 1c06ce60d684f6bf658ee8f167b522c83bba2da9 Mon Sep 17 00:00:00 2001 From: Knut Wannheden Date: Tue, 13 Jan 2026 09:29:05 +0100 Subject: [PATCH 2/2] Improve deserialization performance with streaming parser Optimize JSON-RPC message handling with several performance improvements: 1. Streaming deserialization using JsonParser and TokenBuffer: - Parse JSON-RPC messages with streaming API instead of reading into intermediate Map - Defer params/result parsing using TokenBuffer until actual type is needed via convertValue() - Primitives for 'result' are read directly without TokenBuffer 2. LimitedInputStream for zero-copy content reading: - Stream content directly from underlying input without copying to intermediate byte array - Properly handles remaining bytes after message deserialization 3. ObjectWriter caching via ClassValue for serialization: - Cache ObjectWriter instances per message type for faster writes 4. ThreadLocal ByteArrayOutputStream for send buffer reuse: - Avoid allocating new buffer for each message sent --- .../formatter/JsonMessageFormatter.java | 94 ++++++++++++++++--- .../HeaderDelimitedMessageHandler.java | 32 +++---- .../jsonrpc/internal/LimitedInputStream.java | 94 +++++++++++++++++++ 3 files changed, 190 insertions(+), 30 deletions(-) create mode 100644 src/main/java/io/moderne/jsonrpc/internal/LimitedInputStream.java diff --git a/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java b/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java index fc5ba09..7d50c9d 100644 --- a/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java +++ b/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java @@ -17,11 +17,14 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.cfg.ConstructorDetector; import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.util.TokenBuffer; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; import io.moderne.jsonrpc.JsonRpcError; @@ -33,10 +36,15 @@ import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Type; -import java.util.Map; public class JsonMessageFormatter implements MessageFormatter { private final ObjectMapper mapper; + private final ClassValue writerCache = new ClassValue() { + @Override + protected ObjectWriter computeValue(Class type) { + return mapper.writerFor(type); + } + }; public JsonMessageFormatter() { this(JsonMapper.builder() @@ -77,26 +85,90 @@ public JsonMessageFormatter(ObjectMapper mapper) { @Override public JsonRpcMessage deserialize(InputStream in) throws IOException { - Map payload = mapper.readValue(in, new TypeReference>() {}); - if (payload.containsKey("method")) { - return mapper.convertValue(payload, JsonRpcRequest.class); - } else if (payload.containsKey("error")) { - return mapper.convertValue(payload, JsonRpcError.class); + JsonParser parser = mapper.getFactory().createParser(in); + + Object id = null; + String method = null; + TokenBuffer params = null; + TokenBuffer errorBuffer = null; + Object result = null; + + if (parser.nextToken() != JsonToken.START_OBJECT) { + return JsonRpcError.invalidRequest(null, "Expected JSON object"); } - Object id = payload.get("id"); + + while (parser.nextToken() != JsonToken.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case "jsonrpc": + parser.skipChildren(); + break; + case "id": + id = normalizeId(parser.readValueAs(Object.class)); + break; + case "method": + method = parser.getValueAsString(); + break; + case "params": + params = captureValue(parser); + break; + case "error": + errorBuffer = captureValue(parser); + break; + case "result": + JsonToken token = parser.currentToken(); + if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { + result = captureValue(parser); + } else { + result = parser.readValueAs(Object.class); + } + break; + default: + parser.skipChildren(); + break; + } + } + + if (method != null) { + return new JsonRpcRequest(id, method, params); + } else if (errorBuffer != null) { + JsonRpcError.Detail detail = convertValue(errorBuffer, JsonRpcError.Detail.class); + return new JsonRpcError(id, detail); + } + return JsonRpcSuccess.fromPayload(id, result, this); + } + + private TokenBuffer captureValue(JsonParser parser) throws IOException { + TokenBuffer buffer = new TokenBuffer(parser); + buffer.copyCurrentStructure(parser); + return buffer; + } + + private Object normalizeId(Object id) { if (id instanceof Number) { - id = ((Number) id).intValue(); + return ((Number) id).intValue(); } - return JsonRpcSuccess.fromPayload(id, payload.get("result"), this); + return id; } @Override public void serialize(JsonRpcMessage message, OutputStream out) throws IOException { - mapper.writeValue(out, message); + writerCache.get(message.getClass()).writeValue(out, message); } @Override public T convertValue(Object value, Type type) { + if (value instanceof TokenBuffer) { + try { + JsonParser bufferParser = ((TokenBuffer) value).asParser(); + bufferParser.nextToken(); + return mapper.readValue(bufferParser, mapper.getTypeFactory().constructType(type)); + } catch (IOException e) { + throw new RuntimeException("Failed to convert TokenBuffer", e); + } + } return mapper.convertValue(value, mapper.getTypeFactory().constructType(type)); } } diff --git a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java index 8f8b8c1..47b578a 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java @@ -19,6 +19,7 @@ import io.moderne.jsonrpc.JsonRpcMessage; import io.moderne.jsonrpc.formatter.JsonMessageFormatter; import io.moderne.jsonrpc.formatter.MessageFormatter; +import io.moderne.jsonrpc.internal.LimitedInputStream; import lombok.RequiredArgsConstructor; import java.io.*; @@ -35,6 +36,8 @@ @RequiredArgsConstructor public class HeaderDelimitedMessageHandler implements MessageHandler { private static final Pattern CONTENT_LENGTH = Pattern.compile("Content-Length: (\\d+)"); + private static final ThreadLocal SEND_BUFFER = + ThreadLocal.withInitial(() -> new ByteArrayOutputStream(8192)); private final MessageFormatter formatter; private final InputStream inputStream; @@ -68,20 +71,11 @@ public JsonRpcMessage receive() { } } - byte[] content = new byte[Integer.parseInt(contentLengthMatcher.group(1))]; - for (int totalRead = 0; totalRead < content.length; ) { - int bytesRead = inputStream.read(content, totalRead, content.length - totalRead); - if (bytesRead == -1) { - // Stream ended unexpectedly before reading full content - return JsonRpcError.invalidRequest(null, - "Content length mismatch. Expected " + content.length + - " but received " + totalRead); - } - totalRead += bytesRead; - } - - ByteArrayInputStream bis = new ByteArrayInputStream(content); - return formatter.deserialize(bis); + int length = Integer.parseInt(contentLengthMatcher.group(1)); + LimitedInputStream limited = new LimitedInputStream(inputStream, length); + JsonRpcMessage msg = formatter.deserialize(limited); + limited.skipRemaining(); + return msg; } catch (IOException e) { return JsonRpcError.invalidRequest(null, e.getMessage()); } @@ -104,16 +98,16 @@ private String readLineFromInputStream() throws IOException { @Override public void send(JsonRpcMessage msg) { try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - formatter.serialize(msg, bos); - byte[] content = bos.toByteArray(); - outputStream.write(("Content-Length: " + content.length + "\r\n").getBytes()); + ByteArrayOutputStream buffer = SEND_BUFFER.get(); + buffer.reset(); + formatter.serialize(msg, buffer); + outputStream.write(("Content-Length: " + buffer.size() + "\r\n").getBytes()); if (formatter.getEncoding() != StandardCharsets.UTF_8) { outputStream.write(("Content-Type: application/vscode-jsonrpc;charset=" + formatter.getEncoding().name() + "\r\n").getBytes()); } outputStream.write('\r'); outputStream.write('\n'); - outputStream.write(content); + buffer.writeTo(outputStream); outputStream.flush(); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/src/main/java/io/moderne/jsonrpc/internal/LimitedInputStream.java b/src/main/java/io/moderne/jsonrpc/internal/LimitedInputStream.java new file mode 100644 index 0000000..b93b393 --- /dev/null +++ b/src/main/java/io/moderne/jsonrpc/internal/LimitedInputStream.java @@ -0,0 +1,94 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc.internal; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * An InputStream wrapper that limits the number of bytes that can be read + * from the underlying stream. Once the limit is reached, further reads + * return -1 (EOF). + */ +public class LimitedInputStream extends FilterInputStream { + private long remaining; + + public LimitedInputStream(InputStream in, long limit) { + super(in); + this.remaining = limit; + } + + @Override + public int read() throws IOException { + if (remaining <= 0) { + return -1; + } + int result = in.read(); + if (result != -1) { + remaining--; + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (remaining <= 0) { + return -1; + } + int toRead = (int) Math.min(len, remaining); + int result = in.read(b, off, toRead); + if (result > 0) { + remaining -= result; + } + return result; + } + + @Override + public long skip(long n) throws IOException { + long toSkip = Math.min(n, remaining); + long skipped = in.skip(toSkip); + remaining -= skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return (int) Math.min(in.available(), remaining); + } + + @Override + public void close() { + // Do not close the underlying stream - we need it for subsequent messages + } + + /** + * Skips any remaining bytes up to the limit. + * Call this after reading to ensure the underlying stream is + * positioned correctly for subsequent reads. + */ + public void skipRemaining() throws IOException { + while (remaining > 0) { + long skipped = skip(remaining); + if (skipped <= 0) { + // skip() didn't work, try reading + if (read() == -1) { + break; + } + } + } + } +}