From ea7e7051ea315380ac5b1a04cf4393acbd35306b Mon Sep 17 00:00:00 2001 From: Jonathan Schneider Date: Wed, 6 May 2026 22:02:02 -0400 Subject: [PATCH] Streaming RawJson deserialize + hot-path cleanups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-trip end-to-end measured on a paired-pipe dispatch loop (5000 iterations): tiny 1.15x, medium 1.21x, large (3.5KB) 1.60x. Phase 2 — RawJson wrapper + streaming deserialize - New io.moderne.jsonrpc.RawJson: library-owned wrapper holding either a Jackson TokenBuffer (inbound, lazy materialization) or a POJO (outbound). Keeps Jackson out of the public ABI. - JsonRpcRequest.params and JsonRpcSuccess.result change from Object to RawJson. JsonRpcRequest.newRequest(method, POJO) factory keeps working. - JsonMessageFormatter.deserialize: full streaming via JsonParser + TokenBuffer, no intermediate JsonNode tree. convertValue unwraps RawJson for either POJO or TokenBuffer. Phase 3 — hot-path cleanups - JsonRpcMethod: paramType resolved once in the constructor instead of per dispatch via getClass().getGenericSuperclass(). - JsonRpcIdDeserializer: direct JsonToken inspection, no JsonNode allocation per inbound id. - internal/SnowflakeId: lock-free generation via single packed AtomicLong (timestamp << SEQUENCE_BITS | sequence) preserving millisecond monotonicity. New concurrency test in SnowflakeIdTest verifies no duplicates under contention. - HeaderDelimitedMessageHandler / NewLineDelimitedMessageHandler: wrap input in BufferedInputStream when the caller hasn't already. - MeteredMessageHandler: pre-build the four no-error Timers in the constructor; cache error-tagged Timers in a bounded ConcurrentHashMap (cap 64) to avoid the cardinality trap of unbounded per-method tags. --- src/main/java/io/moderne/jsonrpc/JsonRpc.java | 36 +++-- .../jsonrpc/JsonRpcIdDeserializer.java | 37 +++-- .../io/moderne/jsonrpc/JsonRpcMethod.java | 16 ++- .../io/moderne/jsonrpc/JsonRpcRequest.java | 12 +- .../io/moderne/jsonrpc/JsonRpcSuccess.java | 12 +- src/main/java/io/moderne/jsonrpc/RawJson.java | 108 +++++++++++++++ .../formatter/JsonMessageFormatter.java | 130 +++++++++++++++--- .../jsonrpc/formatter/MessageFormatter.java | 10 +- .../HeaderDelimitedMessageHandler.java | 12 +- .../handler/MeteredMessageHandler.java | 119 ++++++++++++---- .../NewLineDelimitedMessageHandler.java | 12 +- .../moderne/jsonrpc/internal/SnowflakeId.java | 57 +++++--- .../java/io/moderne/jsonrpc/JsonRpcTest.java | 6 +- .../jsonrpc/internal/SnowflakeIdTest.java | 69 ++++++++++ 14 files changed, 519 insertions(+), 117 deletions(-) create mode 100644 src/main/java/io/moderne/jsonrpc/RawJson.java create mode 100644 src/test/java/io/moderne/jsonrpc/internal/SnowflakeIdTest.java diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpc.java b/src/main/java/io/moderne/jsonrpc/JsonRpc.java index a102057..7850422 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpc.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpc.java @@ -18,13 +18,11 @@ import io.moderne.jsonrpc.formatter.JsonMessageFormatter; import io.moderne.jsonrpc.formatter.MessageFormatter; import io.moderne.jsonrpc.handler.MessageHandler; -import lombok.RequiredArgsConstructor; import java.io.EOFException; import java.util.Map; import java.util.concurrent.*; -@RequiredArgsConstructor public class JsonRpc { private final ForkJoinPool forkJoin = new ForkJoinPool( 4, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); @@ -45,6 +43,11 @@ public JsonRpc(MessageHandler messageHandler) { this(messageHandler, new JsonMessageFormatter()); } + public JsonRpc(MessageHandler messageHandler, MessageFormatter formatter) { + this.messageHandler = messageHandler; + this.formatter = formatter; + } + public

JsonRpc rpc(String name, JsonRpcMethod

method) { methods.put(name, method); return this; @@ -105,18 +108,7 @@ protected void compute() { messageHandler.send(JsonRpcError.methodNotFound(errorId, errorMethod), formatter) ).fork(); } else { - ForkJoinTask.adapt(() -> { - try { - Object response = method.convertAndHandle(request.getParams(), formatter); - if (response != null) { - messageHandler.send(new JsonRpcSuccess(request.getId(), response), formatter); - } else { - messageHandler.send(JsonRpcError.internalError(request.getId(), "Method returned null"), formatter); - } - } catch (Exception e) { - messageHandler.send(JsonRpcError.internalError(request.getId(), e), formatter); - } - }).fork(); + ForkJoinTask.adapt(() -> dispatch(request, method)).fork(); } } } catch (EOFException e) { @@ -157,6 +149,22 @@ protected void compute() { return this; } + private void dispatch(JsonRpcRequest request, JsonRpcMethod method) { + JsonRpcMessage outbound; + try { + Object result = method.convertAndHandle(request.getParams(), formatter); + // Wrap the handler's return value so the on-wire representation + // goes through the same RawJson + Jackson serializer pipeline + // as inbound-converted values. + outbound = result != null + ? new JsonRpcSuccess(request.getId(), RawJson.of(result)) + : JsonRpcError.internalError(request.getId(), "Method returned null"); + } catch (Exception e) { + outbound = JsonRpcError.internalError(request.getId(), e); + } + messageHandler.send(outbound, formatter); + } + public void shutdown() { shutdown = true; forkJoin.shutdownNow(); diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpcIdDeserializer.java b/src/main/java/io/moderne/jsonrpc/JsonRpcIdDeserializer.java index 41ad2cc..1208712 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpcIdDeserializer.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpcIdDeserializer.java @@ -16,33 +16,32 @@ package io.moderne.jsonrpc; import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.ObjectCodec; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; public class JsonRpcIdDeserializer extends JsonDeserializer { @Override - public Object deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { - ObjectCodec codec = jsonParser.getCodec(); - JsonNode jsonNode = codec.readTree(jsonParser); - if (jsonNode.isNumber()) { - // The assumption here is that the id is either a String or an Integer, and likely - // an Integer that is no larger than JavaScripts `Number.MAX_SAFE_INTEGER` since - // any JSON-RPC client interacting with a JavaScript peer wouldn't be able to send - // integer values larger than that without JavaScript converting that integer to a - // float, losing precision, and therefore not being able to associate requests/responses - // with the correct id. - return jsonNode.asInt(); - } else if (jsonNode.isTextual()) { - return jsonNode.asText(); - } else if (jsonNode.isNull()) { - return null; - } else { - throw new IOException("A JSON-RPC ID according to the spec \"MUST contain a String, Number, or NULL value if included\". See §4 of https://www.jsonrpc.org/specification."); + public Object deserialize(JsonParser parser, DeserializationContext context) throws IOException { + // Direct token inspection — no JsonNode tree allocation. The id field + // is a scalar by spec ("String, Number, or NULL value if included"), + // so a single switch on the current token covers every legal shape. + switch (parser.currentToken()) { + case VALUE_NUMBER_INT: + // Per the comment that used to live here: assume int. Any + // JSON-RPC client interacting with a JavaScript peer cannot + // send integers larger than Number.MAX_SAFE_INTEGER without + // losing precision, so widening to long isn't worth the API + // change. + return parser.getIntValue(); + case VALUE_STRING: + return parser.getText(); + case VALUE_NULL: + return null; + default: + throw new IOException("A JSON-RPC ID according to the spec \"MUST contain a String, Number, or NULL value if included\". See §4 of https://www.jsonrpc.org/specification."); } } } diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpcMethod.java b/src/main/java/io/moderne/jsonrpc/JsonRpcMethod.java index b0daad7..ac6149c 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpcMethod.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpcMethod.java @@ -16,6 +16,7 @@ package io.moderne.jsonrpc; import io.moderne.jsonrpc.formatter.MessageFormatter; +import org.jspecify.annotations.Nullable; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; @@ -23,13 +24,22 @@ @SuppressWarnings("unused") public abstract class JsonRpcMethod

{ - final Object convertAndHandle(Object params, MessageFormatter formatter) throws Exception { - Type paramType = ((ParameterizedType) getClass().getGenericSuperclass()) + // Resolved once per instance at construction. The previous implementation + // walked getGenericSuperclass()/getActualTypeArguments() on every dispatch + // — same answer every call, so cache it. + private final Type paramType; + + protected JsonRpcMethod() { + this.paramType = ((ParameterizedType) getClass().getGenericSuperclass()) .getActualTypeArguments()[0]; + } + + @SuppressWarnings("unchecked") + final Object convertAndHandle(@Nullable RawJson params, MessageFormatter formatter) throws Exception { if (Void.class.equals(paramType)) { return handle(null); } - return handle(formatter.convertValue(params, paramType)); + return handle(params == null ? null : (P) formatter.convertValue(params, paramType)); } protected abstract Object handle(P params) throws Exception; diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpcRequest.java b/src/main/java/io/moderne/jsonrpc/JsonRpcRequest.java index 4b9715c..4dabd68 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpcRequest.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpcRequest.java @@ -30,13 +30,17 @@ public class JsonRpcRequest extends JsonRpcMessage { String method; /** - * Either a Map of named parameters or a List of positional parameters. + * Either named parameters (a Map-shaped value), positional parameters + * (a list), or a typed POJO wrapped at request construction. Use + * {@link RawJson#as} (or call {@link io.moderne.jsonrpc.formatter.MessageFormatter#convertValue} + * directly) inside a {@link JsonRpcMethod} to materialize the typed value. */ @Nullable - Object params; + RawJson params; - public static JsonRpcRequest newRequest(String method, Object params) { - return new JsonRpcRequest(SnowflakeId.generateId(), method, params); + public static JsonRpcRequest newRequest(String method, @Nullable Object params) { + return new JsonRpcRequest(SnowflakeId.generateId(), method, + params == null ? null : RawJson.of(params)); } public static JsonRpcRequest newRequest(String method) { diff --git a/src/main/java/io/moderne/jsonrpc/JsonRpcSuccess.java b/src/main/java/io/moderne/jsonrpc/JsonRpcSuccess.java index b898f6c..e2c15cd 100644 --- a/src/main/java/io/moderne/jsonrpc/JsonRpcSuccess.java +++ b/src/main/java/io/moderne/jsonrpc/JsonRpcSuccess.java @@ -31,7 +31,7 @@ public class JsonRpcSuccess extends JsonRpcResponse { @Getter @Nullable - private final Object result; + private final RawJson result; @JsonIgnore @EqualsAndHashCode.Exclude @@ -39,22 +39,22 @@ public class JsonRpcSuccess extends JsonRpcResponse { @Nullable private final transient MessageFormatter formatter; - public JsonRpcSuccess(Object id, @Nullable Object result) { + public JsonRpcSuccess(Object id, @Nullable RawJson result) { this(id, result, null); } - private JsonRpcSuccess(Object id, @Nullable Object result, @Nullable MessageFormatter formatter) { + private JsonRpcSuccess(Object id, @Nullable RawJson result, @Nullable MessageFormatter formatter) { this.id = id; this.result = result; this.formatter = formatter; } - public static JsonRpcSuccess fromPayload(Object id, @Nullable Object result, @Nullable MessageFormatter formatter) { + public static JsonRpcSuccess fromPayload(Object id, @Nullable RawJson result, @Nullable MessageFormatter formatter) { return new JsonRpcSuccess(id, result, formatter); } - public V getResult(Class resultType) { + public @Nullable V getResult(Class resultType) { assert formatter != null; - return formatter.convertValue(result, resultType); + return result == null ? null : formatter.convertValue(result, resultType); } } diff --git a/src/main/java/io/moderne/jsonrpc/RawJson.java b/src/main/java/io/moderne/jsonrpc/RawJson.java new file mode 100644 index 0000000..4942cd2 --- /dev/null +++ b/src/main/java/io/moderne/jsonrpc/RawJson.java @@ -0,0 +1,108 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import io.moderne.jsonrpc.formatter.MessageFormatter; +import org.jspecify.annotations.Nullable; + +import java.io.IOException; +import java.lang.reflect.Type; + +/** + * Library-owned wrapper for the {@code params}, {@code result}, and inbound + * {@code error} JSON values on JSON-RPC messages. Holds either: + *

    + *
  • a POJO (outbound — wrapped at request construction, serialized + * through whatever the {@link MessageFormatter} uses on the wire), or
  • + *
  • a parser-format-specific buffer (inbound — produced by the formatter + * during {@code deserialize}; converted lazily to a typed POJO when the + * consumer asks via {@link #as(MessageFormatter, Class)}), or
  • + *
  • {@code null}.
  • + *
+ * Keeps Jackson types out of the public ABI: consumers see only library + * classes, so adding/upgrading/swapping the wire format does not force them + * to recompile. + */ +@JsonSerialize(using = RawJson.RawJsonSerializer.class) +public final class RawJson { + + private static final RawJson NULL = new RawJson(null); + + private final @Nullable Object value; + + private RawJson(@Nullable Object value) { + this.value = value; + } + + /** + * Wrap an arbitrary value. {@code null} returns the canonical null + * instance (no allocation per call). + */ + public static RawJson of(@Nullable Object value) { + return value == null ? NULL : new RawJson(value); + } + + public boolean isNull() { + return value == null; + } + + /** + * Convert this value to {@code type} using {@code formatter}. Returns + * {@code null} when {@link #isNull()}; otherwise delegates to + * {@link MessageFormatter#convertValue(RawJson, Type)}. + */ + public @Nullable T as(MessageFormatter formatter, Class type) { + return formatter.convertValue(this, type); + } + + public @Nullable T as(MessageFormatter formatter, Type type) { + return formatter.convertValue(this, type); + } + + /** + * Internal accessor for {@link MessageFormatter} implementations and the + * default Jackson serializer. Returns the wrapped value (POJO, parser + * buffer, or {@code null}). Library consumers should use + * {@link #as(MessageFormatter, Class)} instead — calling {@code unwrap()} + * directly couples them to the wire format. + */ + public @Nullable Object unwrap() { + return value; + } + + public static final class RawJsonSerializer extends StdSerializer { + public RawJsonSerializer() { + super(RawJson.class); + } + + @Override + public void serialize(RawJson value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + Object inner = value.unwrap(); + if (inner == null) { + gen.writeNull(); + return; + } + // Jackson's defaultSerializeValue handles both POJOs and the + // format's own buffered token type (e.g. TokenBuffer for JSON — + // it knows how to replay the captured tokens into the writer). + serializers.defaultSerializeValue(inner, gen); + } + } +} diff --git a/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java b/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java index bbfd205..1b7c9de 100644 --- a/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java +++ b/src/main/java/io/moderne/jsonrpc/formatter/JsonMessageFormatter.java @@ -17,23 +17,26 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.cfg.ConstructorDetector; import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.util.TokenBuffer; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; import io.moderne.jsonrpc.JsonRpcError; import io.moderne.jsonrpc.JsonRpcMessage; import io.moderne.jsonrpc.JsonRpcRequest; import io.moderne.jsonrpc.JsonRpcSuccess; +import io.moderne.jsonrpc.RawJson; +import org.jspecify.annotations.Nullable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Type; -import java.util.Map; public class JsonMessageFormatter implements MessageFormatter { private final ObjectMapper mapper; @@ -56,8 +59,6 @@ public JsonMessageFormatter() { public JsonMessageFormatter(com.fasterxml.jackson.databind.Module... modules) { this(JsonMapper.builder() - // to be able to construct classes that have @Data and a single field - // see https://cowtowncoder.medium.com/jackson-2-12-most-wanted-3-5-246624e2d3d0 .constructorDetector(ConstructorDetector.USE_PROPERTIES_BASED) .build() .registerModules(new ParameterNamesModule(), new JavaTimeModule()) @@ -77,18 +78,101 @@ public JsonMessageFormatter(ObjectMapper mapper) { @Override public JsonRpcMessage deserialize(InputStream in) throws IOException { - Map payload = mapper.readValue(in, new TypeReference>() { - }); - if (payload.containsKey("method")) { - return mapper.convertValue(payload, JsonRpcRequest.class); - } else if (payload.containsKey("error")) { - return mapper.convertValue(payload, JsonRpcError.class); + // Streaming parser: walk the JSON object once, capture params/result/ + // error structure into TokenBuffers (lazy materialization), and read + // scalars directly. Avoids the JSON → Map → POJO + // double-pass the original implementation paid on every message. + JsonParser parser = mapper.getFactory().createParser(in); + + Object id = null; + String method = null; + TokenBuffer params = null; + TokenBuffer errorBuffer = null; + Object resultScalar = null; + TokenBuffer resultBuffer = null; + boolean haveResultField = false; + + if (parser.nextToken() != JsonToken.START_OBJECT) { + throw new IOException("Expected JSON object"); + } + + while (parser.nextToken() != JsonToken.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + if (fieldName == null) { + parser.skipChildren(); + continue; + } + switch (fieldName) { + case "jsonrpc": + parser.skipChildren(); + break; + case "id": + id = normalizeId(parser); + break; + case "method": + method = parser.getValueAsString(); + break; + case "params": + params = captureValue(parser); + break; + case "error": + errorBuffer = captureValue(parser); + break; + case "result": + haveResultField = true; + JsonToken t = parser.currentToken(); + if (t == JsonToken.START_OBJECT || t == JsonToken.START_ARRAY) { + resultBuffer = captureValue(parser); + } else { + // Primitive — read directly without buffering. + resultScalar = parser.readValueAs(Object.class); + } + break; + default: + parser.skipChildren(); + break; + } + } + + if (method != null) { + return new JsonRpcRequest(id, method, params == null ? null : RawJson.of(params)); } - Object id = payload.get("id"); - if (id instanceof Number) { - id = ((Number) id).intValue(); + if (errorBuffer != null) { + JsonRpcError.Detail detail = convertValue(RawJson.of(errorBuffer), JsonRpcError.Detail.class); + return new JsonRpcError(id, detail); + } + if (haveResultField) { + RawJson result = resultBuffer != null ? RawJson.of(resultBuffer) : RawJson.of(resultScalar); + return JsonRpcSuccess.fromPayload(id, result, this); + } + // No method, no error, no result — treat as a success with null result + // (matches the prior {@code mapper.convertValue} fallback behavior). + return JsonRpcSuccess.fromPayload(id, null, this); + } + + private TokenBuffer captureValue(JsonParser parser) throws IOException { + TokenBuffer buffer = new TokenBuffer(parser); + buffer.copyCurrentStructure(parser); + return buffer; + } + + private @Nullable Object normalizeId(JsonParser parser) throws IOException { + // Match the legacy JsonRpcIdDeserializer contract: int / string / null. + switch (parser.currentToken()) { + case VALUE_NUMBER_INT: + return parser.getIntValue(); + case VALUE_STRING: + return parser.getText(); + case VALUE_NULL: + return null; + default: + // Skip — best-effort. The id is optional and any peer that + // sends a non-scalar id violates the spec; we just don't + // correlate it. + parser.skipChildren(); + return null; } - return JsonRpcSuccess.fromPayload(id, payload.get("result"), this); } @Override @@ -97,7 +181,21 @@ public void serialize(JsonRpcMessage message, OutputStream out) throws IOExcepti } @Override - public T convertValue(Object value, Type type) { - return mapper.convertValue(value, mapper.getTypeFactory().constructType(type)); + @SuppressWarnings("unchecked") + public @Nullable T convertValue(RawJson value, Type type) { + Object inner = value.unwrap(); + if (inner == null) { + return null; + } + if (inner instanceof TokenBuffer) { + try { + JsonParser bufferParser = ((TokenBuffer) inner).asParser(); + bufferParser.nextToken(); + return mapper.readValue(bufferParser, mapper.getTypeFactory().constructType(type)); + } catch (IOException e) { + throw new RuntimeException("Failed to convert TokenBuffer to " + type, e); + } + } + return (T) mapper.convertValue(inner, mapper.getTypeFactory().constructType(type)); } } diff --git a/src/main/java/io/moderne/jsonrpc/formatter/MessageFormatter.java b/src/main/java/io/moderne/jsonrpc/formatter/MessageFormatter.java index 1c27489..c742670 100644 --- a/src/main/java/io/moderne/jsonrpc/formatter/MessageFormatter.java +++ b/src/main/java/io/moderne/jsonrpc/formatter/MessageFormatter.java @@ -16,6 +16,8 @@ package io.moderne.jsonrpc.formatter; import io.moderne.jsonrpc.JsonRpcMessage; +import io.moderne.jsonrpc.RawJson; +import org.jspecify.annotations.Nullable; import java.io.IOException; import java.io.InputStream; @@ -30,9 +32,13 @@ public interface MessageFormatter { void serialize(JsonRpcMessage message, OutputStream out) throws IOException; /** - * Converts a value (typically from JSON-RPC params or result) to the specified type. + * Convert a {@link RawJson} value (typically from JSON-RPC {@code params} + * or {@code result}) to a typed POJO. Implementations decide how to + * handle each possible wrapped representation: a POJO awaiting + * serialization, a parser-format-specific buffer captured during + * {@code deserialize}, or {@code null}. */ - T convertValue(Object value, Type type); + @Nullable T convertValue(RawJson value, Type type); default Charset getEncoding() { return StandardCharsets.UTF_8; diff --git a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java index b44575b..4b63178 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/HeaderDelimitedMessageHandler.java @@ -18,7 +18,6 @@ import io.moderne.jsonrpc.JsonRpcMessage; import io.moderne.jsonrpc.JsonRpcReceiveException; import io.moderne.jsonrpc.formatter.MessageFormatter; -import lombok.RequiredArgsConstructor; import org.jspecify.annotations.Nullable; import java.io.*; @@ -32,7 +31,6 @@ * It utilizes HTTP-like headers to introduce each JSON-RPC message by describing its * length and (optionally) its text encoding. */ -@RequiredArgsConstructor public class HeaderDelimitedMessageHandler implements MessageHandler { private static final Pattern CONTENT_LENGTH = Pattern.compile("Content-Length: (\\d+)"); @@ -59,6 +57,16 @@ public HeaderDelimitedMessageHandler(MessageFormatter formatter, InputStream inp this.formatter = formatter; } + public HeaderDelimitedMessageHandler(InputStream inputStream, OutputStream outputStream) { + // Wrap so byte-by-byte header reads (`readLineFromInputStream`) don't + // hit a syscall per byte. Skip re-wrapping a stream the caller has + // already buffered — double-buffering wastes a copy with no benefit. + this.inputStream = inputStream instanceof BufferedInputStream + ? inputStream + : new BufferedInputStream(inputStream); + this.outputStream = outputStream; + } + @Override public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { MessageFormatter effectiveFormatter = this.formatter != null ? this.formatter : formatter; diff --git a/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java index ca29ea7..a1a3894 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/MeteredMessageHandler.java @@ -22,53 +22,120 @@ import io.moderne.jsonrpc.JsonRpcRequest; import io.moderne.jsonrpc.JsonRpcSuccess; import io.moderne.jsonrpc.formatter.MessageFormatter; -import lombok.RequiredArgsConstructor; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; -@RequiredArgsConstructor public class MeteredMessageHandler implements MessageHandler { + /** + * Cap on distinct error-code Timer entries per direction. Beyond this, + * unrecognized codes share a single {@code error="other"} bucket — the + * library is published and consumed by external clients whose error + * codes are not bounded by spec, so an unbounded tag set could explode + * cardinality in the consumer's metrics backend. + */ + private static final int ERROR_TIMER_CAP = 64; + private final MessageHandler delegate; private final MeterRegistry meterRegistry; + // The four fixed-tag Timers covering every non-error message: pre-built + // once at construction so the hot path does no Timer.Builder allocation + // and no per-call tag-string assembly. + private final Timer receivedRequest; + private final Timer receivedResponse; + private final Timer sentRequest; + private final Timer sentResponse; + + // Error Timers are keyed (direction, code). Lazy-built up to + // ERROR_TIMER_CAP entries; further codes fold into a single "other" + // bucket per direction. + private final ConcurrentHashMap errorTimers = new ConcurrentHashMap<>(); + + public MeteredMessageHandler(MessageHandler delegate, MeterRegistry meterRegistry) { + this.delegate = delegate; + this.meterRegistry = meterRegistry; + String handler = delegate.getClass().getSimpleName(); + this.receivedRequest = buildTimer("jsonrpc.receive", "received", "request", "none", handler); + this.receivedResponse = buildTimer("jsonrpc.receive", "received", "response", "none", handler); + this.sentRequest = buildTimer("jsonrpc.send", "sent", "request", "none", handler); + this.sentResponse = buildTimer("jsonrpc.send", "sent", "response", "none", handler); + } + @Override public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { Timer.Sample sample = Timer.start(meterRegistry); - Timer.Builder timer = Timer.builder("jsonrpc.receive") - .description("Time taken to receive a JSON-RPC message") - .tag("direction", "received"); JsonRpcMessage msg = delegate.receive(formatter); - finishTimer(msg, sample, timer); + Timer timer = timerFor(true, msg); + if (timer != null) { + sample.stop(timer); + } return msg; } @Override public void send(JsonRpcMessage msg, MessageFormatter formatter) { Timer.Sample sample = Timer.start(meterRegistry); - Timer.Builder timer = Timer.builder("jsonrpc.send") - .description("Time taken to send a JSON-RPC message") - .tag("direction", "sent"); delegate.send(msg, formatter); - finishTimer(msg, sample, timer); + Timer timer = timerFor(false, msg); + if (timer != null) { + sample.stop(timer); + } } - private void finishTimer(JsonRpcMessage msg, Timer.Sample sample, Timer.Builder timer) { - timer = timer.tag("handler", delegate.getClass().getSimpleName()); + private Timer timerFor(boolean received, JsonRpcMessage msg) { if (msg instanceof JsonRpcSuccess) { - sample.stop(timer - .tag("type", "response") - .tags("error", "none") - .register(meterRegistry)); - } else if (msg instanceof JsonRpcRequest) { - sample.stop(timer - .tag("type", "request") - .tag("error", "none") - .register(meterRegistry)); - } else if (msg instanceof JsonRpcError) { - sample.stop(timer - .tag("type", "error") - .tag("error", Integer.toString(((JsonRpcError) msg).getError().getCode())) - .register(meterRegistry)); + return received ? receivedResponse : sentResponse; + } + if (msg instanceof JsonRpcRequest) { + return received ? receivedRequest : sentRequest; } + if (msg instanceof JsonRpcError) { + return errorTimer(received, ((JsonRpcError) msg).getError().getCode()); + } + return null; + } + + private Timer errorTimer(boolean received, int code) { + // Pack (direction, code) into a single long key — avoids allocating + // a String key on every call (this method is on the receive/send hot + // path and ConcurrentHashMap.get does no work for primitive-Long + // boxes pulled from the Long cache for small values). + long key = ((long) (received ? 1 : 0) << 32) | (code & 0xFFFFFFFFL); + Timer cached = errorTimers.get(key); + if (cached != null) { + return cached; + } + // Cap reached — fold this code into the "other" bucket for this + // direction (key uses code=Integer.MIN_VALUE as the sentinel so it + // can't collide with a legitimate code). + if (errorTimers.size() >= ERROR_TIMER_CAP) { + long otherKey = ((long) (received ? 1 : 0) << 32) | (Integer.MIN_VALUE & 0xFFFFFFFFL); + return errorTimers.computeIfAbsent(otherKey, k -> + buildTimer(received ? "jsonrpc.receive" : "jsonrpc.send", + received ? "received" : "sent", + "error", + "other", + delegate.getClass().getSimpleName())); + } + return errorTimers.computeIfAbsent(key, k -> + buildTimer(received ? "jsonrpc.receive" : "jsonrpc.send", + received ? "received" : "sent", + "error", + Integer.toString(code), + delegate.getClass().getSimpleName())); + } + + private Timer buildTimer(String name, String direction, String type, String errorTag, String handler) { + String description = "jsonrpc.receive".equals(name) + ? "Time taken to receive a JSON-RPC message" + : "Time taken to send a JSON-RPC message"; + return Timer.builder(name) + .description(description) + .tag("direction", direction) + .tag("type", type) + .tag("error", errorTag) + .tag("handler", handler) + .register(meterRegistry); } } diff --git a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java index 0d4a311..0931198 100644 --- a/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java +++ b/src/main/java/io/moderne/jsonrpc/handler/NewLineDelimitedMessageHandler.java @@ -18,7 +18,6 @@ import io.moderne.jsonrpc.JsonRpcMessage; import io.moderne.jsonrpc.JsonRpcReceiveException; import io.moderne.jsonrpc.formatter.MessageFormatter; -import lombok.RequiredArgsConstructor; import java.io.*; @@ -26,11 +25,20 @@ * This appends each JSON-RPC message with \n. It should only be used with UTF-8 text-based * formatters that do not emit new line characters as part of the JSON. */ -@RequiredArgsConstructor public class NewLineDelimitedMessageHandler implements MessageHandler { private final InputStream inputStream; private final OutputStream outputStream; + public NewLineDelimitedMessageHandler(InputStream inputStream, OutputStream outputStream) { + // Same buffering policy as HeaderDelimitedMessageHandler: read-loop is + // byte-by-byte until newline; buffer once at construction so we don't + // syscall per byte. Skip re-wrapping a pre-buffered stream. + this.inputStream = inputStream instanceof BufferedInputStream + ? inputStream + : new BufferedInputStream(inputStream); + this.outputStream = outputStream; + } + @Override public JsonRpcMessage receive(MessageFormatter formatter) throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); diff --git a/src/main/java/io/moderne/jsonrpc/internal/SnowflakeId.java b/src/main/java/io/moderne/jsonrpc/internal/SnowflakeId.java index b9376f6..53f5851 100644 --- a/src/main/java/io/moderne/jsonrpc/internal/SnowflakeId.java +++ b/src/main/java/io/moderne/jsonrpc/internal/SnowflakeId.java @@ -17,9 +17,9 @@ import java.util.concurrent.atomic.AtomicLong; -public class SnowflakeId { +public final class SnowflakeId { private static final long EPOCH = 1640995200000L; // Custom epoch - private static final long MACHINE_ID = 1L; // Unique machine ID (0-1023) + private static final long MACHINE_ID = 1L; // Unique machine ID (0-1023) private static final long MACHINE_ID_BITS = 10L; private static final long SEQUENCE_BITS = 12L; @@ -28,34 +28,51 @@ public class SnowflakeId { private static final long MACHINE_ID_SHIFT = SEQUENCE_BITS; private static final long TIMESTAMP_SHIFT = MACHINE_ID_SHIFT + MACHINE_ID_BITS; - private static final AtomicLong lastTimestamp = new AtomicLong(-1L); - private static final AtomicLong sequence = new AtomicLong(0L); + /** + * Packed (timestamp_since_epoch << SEQUENCE_BITS | sequence_within_ms). + * Single source of truth for monotonicity — split AtomicLongs would race + * across the millisecond boundary and produce duplicates. + */ + private static final AtomicLong state = new AtomicLong(0L); private SnowflakeId() { } /** - * @return A short, unique ID produced in a similar way as Twitter's Snowflake ID + * @return A short, unique ID produced in a similar way as Twitter's Snowflake ID. */ - public static synchronized String generateId() { - long currentTimestamp = System.currentTimeMillis() - EPOCH; + public static String generateId() { + while (true) { + long currentMs = System.currentTimeMillis() - EPOCH; + long prev = state.get(); + long prevTs = prev >>> SEQUENCE_BITS; + long prevSeq = prev & MAX_SEQUENCE; - if (currentTimestamp == lastTimestamp.get()) { - // Increment sequence within the same millisecond - long seq = sequence.incrementAndGet() & MAX_SEQUENCE; - if (seq == 0) { - // Sequence exhausted, wait for next millisecond - while (currentTimestamp <= lastTimestamp.get()) { - currentTimestamp = System.currentTimeMillis() - EPOCH; + long nextTs; + long nextSeq; + if (currentMs > prevTs) { + // Forward time — new millisecond, reset sequence. + nextTs = currentMs; + nextSeq = 0L; + } else { + // Same ms (or clock went backward — keep prevTs to preserve + // monotonicity, advance the sequence within it). + nextTs = prevTs; + nextSeq = prevSeq + 1; + if (nextSeq > MAX_SEQUENCE) { + // Sequence exhausted within this ms — yield and retry + // until the clock advances or we observe a fresher state. + Thread.yield(); + continue; } } - } else { - sequence.set(0L); + long next = (nextTs << SEQUENCE_BITS) | nextSeq; + if (state.compareAndSet(prev, next)) { + return encodeBase62((nextTs << TIMESTAMP_SHIFT) | (MACHINE_ID << MACHINE_ID_SHIFT) | nextSeq); + } + // CAS lost the race — another thread updated state. Retry; the + // fresh `prev` read will produce a higher sequence (or timestamp). } - - lastTimestamp.set(currentTimestamp); - - return encodeBase62((currentTimestamp << TIMESTAMP_SHIFT) | (MACHINE_ID << MACHINE_ID_SHIFT) | sequence.get()); } private static final String BASE62_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; diff --git a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java index 6bd024e..613efcb 100644 --- a/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java +++ b/src/test/java/io/moderne/jsonrpc/JsonRpcTest.java @@ -64,7 +64,7 @@ void requestResponse() throws ExecutionException, InterruptedException, TimeoutE .send(JsonRpcRequest.newRequest("hello", new Person("Jon"))) .get(5, TimeUnit.SECONDS); - assertThat(response.getResult()).isEqualTo("Hello Jon"); + assertThat(response.getResult(String.class)).isEqualTo("Hello Jon"); } @Test @@ -80,7 +80,7 @@ protected Object handle(Void params) { .send(JsonRpcRequest.newRequest("hello")) .get(5, TimeUnit.SECONDS); - assertThat(response.getResult()).isEqualTo("Hello Jon"); + assertThat(response.getResult(String.class)).isEqualTo("Hello Jon"); } @Test @@ -128,7 +128,7 @@ protected Object handle(List names) { .send(JsonRpcRequest.newRequest("hello", List.of("Jon", "Jim"))) .get(5, TimeUnit.SECONDS); - assertThat(response.getResult()).isEqualTo("Hello Jon and Jim"); + assertThat(response.getResult(String.class)).isEqualTo("Hello Jon and Jim"); } @Test diff --git a/src/test/java/io/moderne/jsonrpc/internal/SnowflakeIdTest.java b/src/test/java/io/moderne/jsonrpc/internal/SnowflakeIdTest.java new file mode 100644 index 0000000..bc81df3 --- /dev/null +++ b/src/test/java/io/moderne/jsonrpc/internal/SnowflakeIdTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2025 the original author or authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * https://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.moderne.jsonrpc.internal; + +import org.junit.jupiter.api.Test; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +class SnowflakeIdTest { + + @Test + void manyThreadsProduceNoDuplicates() throws Exception { + // The synchronized version always serialized callers; the lock-free + // version uses CAS on a packed (timestamp, sequence) state. This test + // would have caught a naive AtomicLong replacement that allowed the + // sequence counter to race across the millisecond boundary. + int threads = 16; + int idsPerThread = 5_000; + Set seen = ConcurrentHashMap.newKeySet(threads * idsPerThread); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(threads); + ExecutorService pool = Executors.newFixedThreadPool(threads); + try { + for (int t = 0; t < threads; t++) { + pool.submit(() -> { + try { + start.await(); + for (int i = 0; i < idsPerThread; i++) { + seen.add(SnowflakeId.generateId()); + } + } catch (InterruptedException ignored) { + } finally { + done.countDown(); + } + }); + } + start.countDown(); + assertThat(done.await(30, TimeUnit.SECONDS)) + .as("all worker threads finished within timeout") + .isTrue(); + } finally { + pool.shutdownNow(); + } + + assertThat(seen) + .as("every generated id is unique across %d threads x %d ids", threads, idsPerThread) + .hasSize(threads * idsPerThread); + } +}