From 11f9242a5f9c178cc65b4d37b6322036a9c90c32 Mon Sep 17 00:00:00 2001 From: scottf Date: Sun, 24 Aug 2025 06:53:47 -0400 Subject: [PATCH 1/2] Direct batch construction accepts stream info if caller already has it. --- direct-batch/build.gradle | 9 +++- .../io/synadia/examples/ExampleUtils.java | 4 ++ .../io/synadia/direct/DirectBatchContext.java | 16 ++++-- .../direct/MessageBatchGetRequest.java | 53 +++++++++++++------ 4 files changed, 61 insertions(+), 21 deletions(-) diff --git a/direct-batch/build.gradle b/direct-batch/build.gradle index 470e189..346c497 100644 --- a/direct-batch/build.gradle +++ b/direct-batch/build.gradle @@ -38,7 +38,8 @@ repositories { } dependencies { - implementation 'io.nats:jnats:2.21.5' + implementation 'io.nats:jnats:2.22.0.2_12-SNAPSHOT' + implementation 'org.jspecify:jspecify:1.0.0' testImplementation 'io.nats:jnats-server-runner:1.2.8' testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' @@ -46,6 +47,10 @@ dependencies { testImplementation 'nl.jqno.equalsverifier:equalsverifier:3.12.3' } +configurations.configureEach { + resolutionStrategy.cacheChangingModulesFor 0, 'seconds' +} + sourceSets { main { java { @@ -201,4 +206,4 @@ if (isRelease) { sign configurations.archives sign publishing.publications.mavenJava } -} \ No newline at end of file +} diff --git a/direct-batch/src/examples/java/io/synadia/examples/ExampleUtils.java b/direct-batch/src/examples/java/io/synadia/examples/ExampleUtils.java index ac53044..802de7a 100644 --- a/direct-batch/src/examples/java/io/synadia/examples/ExampleUtils.java +++ b/direct-batch/src/examples/java/io/synadia/examples/ExampleUtils.java @@ -2,6 +2,7 @@ import io.nats.client.NUID; import io.nats.client.api.MessageInfo; +import io.nats.client.impl.Headers; import io.nats.client.support.DateTimeUtils; import java.util.ArrayList; @@ -22,10 +23,13 @@ public static void printMessageInfo(List list) { public static void printMessageInfo(MessageInfo mi, Number listId) { if (mi.isMessage()) { + Headers h = mi.getHeaders(); + String hs = (h == null || h.isEmpty()) ? "none" : h.toString(); System.out.println("[" + listId + "] Message" + " | subject: " + mi.getSubject() + " | sequence: " + mi.getSeq() + " | time: " + (mi.getTime() == null ? "null" : DateTimeUtils.toRfc3339(mi.getTime())) + + " | headers: " + hs ); } else { diff --git a/direct-batch/src/main/java/io/synadia/direct/DirectBatchContext.java b/direct-batch/src/main/java/io/synadia/direct/DirectBatchContext.java index 2dbcebc..e51e34b 100644 --- a/direct-batch/src/main/java/io/synadia/direct/DirectBatchContext.java +++ b/direct-batch/src/main/java/io/synadia/direct/DirectBatchContext.java @@ -34,7 +34,7 @@ public class DirectBatchContext { * @throws JetStreamApiException the request had an error related to the data */ public DirectBatchContext(Connection conn, String streamName) throws IOException, JetStreamApiException { - this(conn, null, streamName); + this(conn, null, streamName, null); } /** @@ -47,6 +47,10 @@ public DirectBatchContext(Connection conn, String streamName) throws IOException * @throws JetStreamApiException the request had an error related to the data */ public DirectBatchContext(Connection conn, JetStreamOptions jso, String streamName) throws IOException, JetStreamApiException { + this(conn, jso, streamName, null); + } + + public DirectBatchContext(Connection conn, JetStreamOptions jso, String streamName, StreamInfo si) throws IOException, JetStreamApiException { validateNotNull(conn, "Connection required,"); if (!conn.getServerInfo().isNewerVersionThan("2.10.99")) { throw new IllegalArgumentException("Batch direct get not available until server version 2.11.0."); @@ -55,8 +59,14 @@ public DirectBatchContext(Connection conn, JetStreamOptions jso, String streamNa this.jso = jso == null ? DEFAULT_JS_OPTIONS : jso; JetStreamManagement jsm = conn.jetStreamManagement(this.jso); - this.streamName = required(streamName, "Stream name required,"); - StreamInfo si = jsm.getStreamInfo(streamName); + if (si == null) { + this.streamName = required(streamName, "Stream name required,"); + si = jsm.getStreamInfo(streamName); + } + else { + this.streamName = si.getConfiguration().getName(); + } + if (!si.getConfiguration().getAllowDirect()) { throw new IllegalArgumentException("Stream must have allow direct set."); } diff --git a/direct-batch/src/main/java/io/synadia/direct/MessageBatchGetRequest.java b/direct-batch/src/main/java/io/synadia/direct/MessageBatchGetRequest.java index 8f8dec8..501ae3c 100644 --- a/direct-batch/src/main/java/io/synadia/direct/MessageBatchGetRequest.java +++ b/direct-batch/src/main/java/io/synadia/direct/MessageBatchGetRequest.java @@ -2,6 +2,7 @@ import io.nats.client.support.JsonSerializable; import io.nats.client.support.Validator; +import org.jspecify.annotations.NonNull; import java.time.ZonedDateTime; import java.util.List; @@ -22,6 +23,7 @@ public class MessageBatchGetRequest implements JsonSerializable { private final List multiLastBySubjects; private final long upToSequence; private final ZonedDateTime upToTime; + private final boolean noHeaders; // batch constructor private MessageBatchGetRequest(String subject, @@ -37,8 +39,40 @@ private MessageBatchGetRequest(String subject, this.multiLastBySubjects = null; this.upToSequence = -1; this.upToTime = null; - this.minSequence = startTime == null && minSequence < 1 ? 1 : minSequence; + noHeaders = false; + } + + // multi last for constructor + private MessageBatchGetRequest(List subjects, long upToSequence, ZonedDateTime upToTime, int batch) { + if (subjects == null || subjects.isEmpty()) { + throw new IllegalArgumentException("Subjects are required."); + } + this.batch = batch; + nextBySubject = null; + this.maxBytes = -1; + this.minSequence = -1; + this.startTime = null; + this.multiLastBySubjects = subjects; + this.upToSequence = upToSequence; + this.upToTime = upToTime; + noHeaders = false; + } + + private MessageBatchGetRequest(MessageBatchGetRequest r, boolean noHeaders) { + this.batch = r.batch; + this.nextBySubject = r.nextBySubject; + this.maxBytes = r.maxBytes; + this.minSequence = r.minSequence; + this.startTime = r.startTime; + this.multiLastBySubjects = r.multiLastBySubjects; + this.upToSequence = r.upToSequence; + this.upToTime = r.upToTime; + this.noHeaders = noHeaders; + } + + public MessageBatchGetRequest noHeaders() { + return new MessageBatchGetRequest(this, true); } /** @@ -108,21 +142,6 @@ public static MessageBatchGetRequest batchBytes(String subject, int batch, int m return new MessageBatchGetRequest(subject, batch, maxBytes, -1, startTime); } - // multi last for constructor - private MessageBatchGetRequest(List subjects, long upToSequence, ZonedDateTime upToTime, int batch) { - if (subjects == null || subjects.isEmpty()) { - throw new IllegalArgumentException("Subjects are required."); - } - this.batch = batch; - nextBySubject = null; - this.maxBytes = -1; - this.minSequence = -1; - this.startTime = null; - this.multiLastBySubjects = subjects; - this.upToSequence = upToSequence; - this.upToTime = upToTime; - } - /** * Get the last messages for the subjects specified subject * @param subjects the subjects, may include wildcards. @@ -253,6 +272,7 @@ public ZonedDateTime getUpToTime() { } @Override + @NonNull public String toJson() { StringBuilder sb = beginJson(); addField(sb, BATCH, batch); @@ -263,6 +283,7 @@ public String toJson() { addStrings(sb, MULTI_LAST, multiLastBySubjects); addField(sb, UP_TO_SEQ, upToSequence); addField(sb, UP_TO_TIME, upToTime); + addFldWhenTrue(sb, NO_HDR, noHeaders); return endJson(sb).toString(); } From 8a27d2c5ffad971b389e6ea588dcc4997d2c2561 Mon Sep 17 00:00:00 2001 From: scottf Date: Sun, 24 Aug 2025 07:32:58 -0400 Subject: [PATCH 2/2] finished counter example --- counter/build.gradle | 8 +- .../examples/CounterContextExample.java | 163 +++++++++++------- .../io/synadia/counter/CounterContext.java | 50 +++++- .../java/io/synadia/counter/CounterEntry.java | 2 +- .../java/io/synadia/counter/CounterValue.java | 92 ++++++++++ 5 files changed, 242 insertions(+), 73 deletions(-) create mode 100644 counter/src/main/java/io/synadia/counter/CounterValue.java diff --git a/counter/build.gradle b/counter/build.gradle index 8bbb6ff..3d92b16 100644 --- a/counter/build.gradle +++ b/counter/build.gradle @@ -39,7 +39,7 @@ repositories { dependencies { implementation 'io.nats:jnats:2.22.0.2_12-SNAPSHOT' - implementation 'io.synadia:direct-batch:0.1.2' + implementation 'io.synadia:direct-batch:0.1.3-SNAPSHOT' implementation 'org.jspecify:jspecify:1.0.0' testImplementation 'commons-codec:commons-codec:1.18.0' @@ -49,6 +49,10 @@ dependencies { testImplementation 'nl.jqno.equalsverifier:equalsverifier:3.12.3' } +configurations.configureEach { + resolutionStrategy.cacheChangingModulesFor 0, 'seconds' +} + sourceSets { main { java { @@ -204,4 +208,4 @@ if (isRelease) { sign configurations.archives sign publishing.publications.mavenJava } -} \ No newline at end of file +} diff --git a/counter/src/examples/java/io/synadia/examples/CounterContextExample.java b/counter/src/examples/java/io/synadia/examples/CounterContextExample.java index 468ecad..72c8d2e 100644 --- a/counter/src/examples/java/io/synadia/examples/CounterContextExample.java +++ b/counter/src/examples/java/io/synadia/examples/CounterContextExample.java @@ -10,6 +10,7 @@ import io.nats.client.api.StreamConfiguration; import io.synadia.counter.CounterContext; import io.synadia.counter.CounterEntry; +import io.synadia.counter.CounterValue; import java.math.BigInteger; import java.util.concurrent.LinkedBlockingQueue; @@ -36,79 +37,117 @@ public static void main(String[] args) throws Exception { .build()); System.out.println("1: Add to a subject..."); - System.out.println("add(\"" + SUBJECT_A + "\", 1) -> " + counter.add(SUBJECT_A, 1)); - System.out.println("add(\"" + SUBJECT_A + "\", 2) -> " + counter.add(SUBJECT_A, 2)); - System.out.println("add(\"" + SUBJECT_A + "\", 3) -> " + counter.add(SUBJECT_A, 3)); - System.out.println("add(\"" + SUBJECT_A + "\", -1) -> " + counter.add(SUBJECT_A, -1)); - - System.out.println("add(\"" + SUBJECT_B + "\", 10) -> " + counter.add(SUBJECT_B, 10)); - System.out.println("add(\"" + SUBJECT_B + "\", 20) -> " + counter.add(SUBJECT_B, 20)); - System.out.println("add(\"" + SUBJECT_B + "\", 30) -> " + counter.add(SUBJECT_B, 30)); - System.out.println("add(\"" + SUBJECT_B + "\", -10) -> " + counter.add(SUBJECT_B, -10)); - - System.out.println("add(\"" + SUBJECT_C + "\", 100) -> " + counter.add(SUBJECT_C, 100)); - System.out.println("add(\"" + SUBJECT_C + "\", 200) -> " + counter.add(SUBJECT_C, 200)); - System.out.println("add(\"" + SUBJECT_C + "\", 300) -> " + counter.add(SUBJECT_C, 300)); - System.out.println("add(\"" + SUBJECT_C + "\", -100) -> " + counter.add(SUBJECT_C, -100)); - - System.out.println("\n2.1: Get the value for existing subjects"); - System.out.println("get(\"" + SUBJECT_A + "\") -> " + counter.get(SUBJECT_A)); - System.out.println("get(\"" + SUBJECT_B + "\") -> " + counter.get(SUBJECT_B)); - System.out.println("get(\"" + SUBJECT_C + "\") -> " + counter.get(SUBJECT_C)); - - System.out.println("\n2.2 Get the value if subject not found"); + System.out.println(" add(\"" + SUBJECT_A + "\", 1) -> " + counter.add(SUBJECT_A, 1)); + System.out.println(" add(\"" + SUBJECT_A + "\", 2) -> " + counter.add(SUBJECT_A, 2)); + System.out.println(" add(\"" + SUBJECT_A + "\", 3) -> " + counter.add(SUBJECT_A, 3)); + System.out.println(" add(\"" + SUBJECT_A + "\", -1) -> " + counter.add(SUBJECT_A, -1)); + + System.out.println(" add(\"" + SUBJECT_B + "\", 10) -> " + counter.add(SUBJECT_B, 10)); + System.out.println(" add(\"" + SUBJECT_B + "\", 20) -> " + counter.add(SUBJECT_B, 20)); + System.out.println(" add(\"" + SUBJECT_B + "\", 30) -> " + counter.add(SUBJECT_B, 30)); + System.out.println(" add(\"" + SUBJECT_B + "\", -10) -> " + counter.add(SUBJECT_B, -10)); + + System.out.println(" add(\"" + SUBJECT_C + "\", 100) -> " + counter.add(SUBJECT_C, 100)); + System.out.println(" add(\"" + SUBJECT_C + "\", 200) -> " + counter.add(SUBJECT_C, 200)); + System.out.println(" add(\"" + SUBJECT_C + "\", 300) -> " + counter.add(SUBJECT_C, 300)); + System.out.println(" add(\"" + SUBJECT_C + "\", -100) -> " + counter.add(SUBJECT_C, -100)); + + System.out.println("\n2.1: get() for existing subjects"); + System.out.println(" get(\"" + SUBJECT_A + "\") -> " + counter.get(SUBJECT_A)); + System.out.println(" get(\"" + SUBJECT_B + "\") -> " + counter.get(SUBJECT_B)); + System.out.println(" get(\"" + SUBJECT_C + "\") -> " + counter.get(SUBJECT_C)); + + System.out.println("\n2.2: get() when the subject is not found"); try { - counter.get("cs.X"); + counter.get("not-found"); } catch (JetStreamApiException e) { - System.out.println("get(\"X\") -> " + e); + System.out.println(" get(\"not-found\") -> " + e); } - System.out.println("\n3: Get the full entry for a subject, notice the last increment..."); - System.out.println("getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A)); - System.out.println("getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B)); - System.out.println("getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C)); + System.out.println("\n3: getEntry() - The full CounterEntry for a subject, notice the last increment..."); + System.out.println(" getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A)); + System.out.println(" getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B)); + System.out.println(" getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C)); - System.out.println("\n4: Get multiples entries - maybe to total them up"); - LinkedBlockingQueue q = counter.getEntries(SUBJECT_A, SUBJECT_B, SUBJECT_C); + System.out.println("\n4.1: getValues() - Get the CounterValue object for multiple subjects. Maybe to total them up?\""); + LinkedBlockingQueue qv = counter.getValues(SUBJECT_A, SUBJECT_B, SUBJECT_C); BigInteger total = BigInteger.ZERO; - CounterEntry entry = q.poll(1, TimeUnit.SECONDS); + CounterValue value = qv.poll(1, TimeUnit.SECONDS); + while (value != null && value.isValue()) { + System.out.println(" " + value); + total = total.add(value.value); + value = qv.poll(10, TimeUnit.MILLISECONDS); + } + System.out.println(" The iteration is signaled done when the CounterValue is a status: " + value); + System.out.println(" Values totaled: " + total); + + System.out.println("\n4.2: getEntries() - Get CounterEntry for multiple subjects."); + LinkedBlockingQueue qe = counter.getEntries(SUBJECT_A, SUBJECT_B, SUBJECT_C); + CounterEntry entry = qe.poll(1, TimeUnit.SECONDS); + while (entry != null && entry.isEntry()) { + System.out.println(" " + entry); + entry = qe.poll(10, TimeUnit.MILLISECONDS); + } + System.out.println(" CounterEntry status 204 signals no more entries: " + entry); + + System.out.println("\n5.1: set() - Set the value for a subject"); + System.out.println(" set(\"" + SUBJECT_A + "\", 9) -> " + counter.set(SUBJECT_A, 9)); + System.out.println(" set(\"" + SUBJECT_B + "\", 99) -> " + counter.set(SUBJECT_B, 99)); + System.out.println(" set(\"" + SUBJECT_C + "\", 999) -> " + counter.set(SUBJECT_C, 999)); + + System.out.println("\n5.2: getEntry() - Get the full CounterEntry, notice the last increment after a set represents" + + "\n the difference between the entry before the set and the set value."); + System.out.println(" getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A)); + System.out.println(" getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B)); + System.out.println(" getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C)); + + System.out.println("\n6.1: zero() is a shortcut to set the value of a subject to 0"); + System.out.println(" zero(\"" + SUBJECT_A + "\") -> " + counter.zero(SUBJECT_A)); + System.out.println(" zero(\"" + SUBJECT_B + "\") -> " + counter.zero(SUBJECT_B)); + System.out.println(" zero(\"" + SUBJECT_C + "\") -> " + counter.zero(SUBJECT_C)); + + System.out.println("\n6.2: getEntry() - Get the full CounterEntry, notice the last increment after a zero represents" + + "\n the difference between the entry before the zero and zero."); + System.out.println(" getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A)); + System.out.println(" getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B)); + System.out.println(" getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C)); + + System.out.println("\n7.1: getValues() - Get multiple CounterValue - but no subjects have counters"); + qv = counter.getValues("no-counters", "also-counters"); + value = qv.poll(1, TimeUnit.SECONDS); + while (value != null && value.isValue()) { + System.out.println(" " + value); + value = qv.poll(10, TimeUnit.MILLISECONDS); + } + System.out.println(" CounterValue status 204 signals no more entries: " + value); + + System.out.println("\n7.2: getEntries() - Get multiple CounterEntry - but no subjects have counters"); + qe = counter.getEntries("no-counters", "also-counters"); + entry = qe.poll(1, TimeUnit.SECONDS); while (entry != null && entry.isEntry()) { - System.out.println("Entry: " + entry); - total = total.add(entry.value); - entry = q.poll(10, TimeUnit.MILLISECONDS); + System.out.println(" " + entry); + entry = qe.poll(10, TimeUnit.MILLISECONDS); } - System.out.println("The last entry was: " + entry); - System.out.println("Entries Totaled: " + total); - - System.out.println("\n5.1: Set the value for a subject"); - System.out.println("set(\"" + SUBJECT_A + "\", 9) -> " + counter.set(SUBJECT_A, 9)); - System.out.println("set(\"" + SUBJECT_B + "\", 99) -> " + counter.set(SUBJECT_B, 99)); - System.out.println("set(\"" + SUBJECT_C + "\", 999) -> " + counter.set(SUBJECT_C, 999)); - - System.out.println("\n5.2: Get the full entry again, notice the last increment after a set..."); - System.out.println("getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A)); - System.out.println("getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B)); - System.out.println("getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C)); - - System.out.println("\n6.1: Zero is a shortcut to set the value for a subject to 0"); - System.out.println("zero(\"" + SUBJECT_A + "\") -> " + counter.zero(SUBJECT_A)); - System.out.println("zero(\"" + SUBJECT_B + "\") -> " + counter.zero(SUBJECT_B)); - System.out.println("zero(\"" + SUBJECT_C + "\") -> " + counter.zero(SUBJECT_C)); - - System.out.println("\n6.2: Get the full entry again, notice the last increment after a zero..."); - System.out.println("getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A)); - System.out.println("getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B)); - System.out.println("getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C)); - - System.out.println("\n7: Get multiples entries - but subject doesn't have any counters"); - q = counter.getEntries("not-no counters"); - entry = q.poll(1, TimeUnit.SECONDS); + System.out.println(" The only CounterEntry received was a 404: " + entry); + + System.out.println("\n8.1: getValues() - Get multiple CounterValue - some subjects have any counters"); + qv = counter.getValues("no-counters", SUBJECT_A, SUBJECT_B, SUBJECT_C); + value = qv.poll(1, TimeUnit.SECONDS); + while (value != null && value.isValue()) { + System.out.println(" " + value); + value = qv.poll(10, TimeUnit.MILLISECONDS); + } + System.out.println(" CounterValue status 204 signals no more entries: " + value); + + System.out.println("\n8.2: getEntries() - Get multiple CounterEntry - some subjects have any counters"); + qe = counter.getEntries("no-counters", SUBJECT_A, SUBJECT_B, SUBJECT_C); + entry = qe.poll(1, TimeUnit.SECONDS); while (entry != null && entry.isEntry()) { - System.out.println("Entry: " + entry); - entry = q.poll(10, TimeUnit.MILLISECONDS); + System.out.println(" " + entry); + entry = qe.poll(10, TimeUnit.MILLISECONDS); } - System.out.println("The last entry was: " + entry); + System.out.println(" CounterEntry status 204 signals no more entries: " + entry); } } } diff --git a/counter/src/main/java/io/synadia/counter/CounterContext.java b/counter/src/main/java/io/synadia/counter/CounterContext.java index 87c9f06..d0ae876 100644 --- a/counter/src/main/java/io/synadia/counter/CounterContext.java +++ b/counter/src/main/java/io/synadia/counter/CounterContext.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import static io.nats.client.support.Validator.required; import static io.synadia.counter.CounterUtils.INCREMENT_HEADER; import static io.synadia.counter.CounterUtils.extractVal; @@ -39,9 +40,9 @@ public static CounterContext createCounterStream(Connection conn, JetStreamOptio .build(); JetStreamManagement jsm = conn.jetStreamManagement(jso); - jsm.addStream(config); + StreamInfo si = jsm.addStream(config); - return new CounterContext(config.getName(), conn, jso, jsm); + return new CounterContext(config.getName(), conn, jso, jsm, si); } private final String streamName; @@ -50,18 +51,40 @@ public static CounterContext createCounterStream(Connection conn, JetStreamOptio private final DirectBatchContext dbCtx; public CounterContext(String streamName, Connection conn) throws IOException, JetStreamApiException { - this(streamName, conn, null, null); + this(streamName, conn, null, null, null); } public CounterContext(String streamName, Connection conn, JetStreamOptions jso) throws IOException, JetStreamApiException { - this(streamName, conn, jso, null); + this(streamName, conn, jso, null, null); } - private CounterContext(@NonNull String streamName, @NonNull Connection conn, @Nullable JetStreamOptions jso, @Nullable JetStreamManagement jsm) throws IOException, JetStreamApiException { - this.streamName = streamName; + private CounterContext(@NonNull String streamName, + @NonNull Connection conn, + @Nullable JetStreamOptions jso, + @Nullable JetStreamManagement jsm, + @Nullable StreamInfo si + ) throws IOException, JetStreamApiException + { this.jsm = jsm == null ? conn.jetStreamManagement(jso) : jsm; js = this.jsm.jetStream(); - dbCtx = new DirectBatchContext(conn, jso, streamName); + + if (si == null) { + this.streamName = required(streamName, "Stream name required,"); + si = this.jsm.getStreamInfo(streamName); + } + else { + this.streamName = si.getConfiguration().getName(); + } + + if (!si.getConfiguration().getAllowDirect()) { + throw new IllegalArgumentException("Stream must have allow direct set."); + } + + if (!si.getConfiguration().getAllowMessageCounter()) { + throw new IllegalArgumentException("Stream must have allow message counter set."); + } + + dbCtx = new DirectBatchContext(conn, jso, streamName, si); } private BigInteger _add(String subject, String sv) throws IOException, JetStreamApiException { @@ -114,10 +137,21 @@ public BigInteger zero(String subject) throws JetStreamApiException, IOException } public BigInteger get(String subject) throws JetStreamApiException, IOException { - MessageInfo mi = jsm.getLastMessage(streamName, subject); + MessageInfo mi = jsm.getMessage(streamName, MessageGetRequest.lastForSubject(subject).noHeaders()); return new BigInteger(extractVal(mi.getData())); } + public LinkedBlockingQueue getValues(String... subjects) { + return getValues(Arrays.asList(subjects)); + } + + public LinkedBlockingQueue getValues(List subjects) { + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + MessageBatchGetRequest mbgr = MessageBatchGetRequest.multiLastForSubjects(subjects); + dbCtx.requestMessageBatch(mbgr, mi -> queue.add(new CounterValue(mi))); + return queue; + } + public CounterEntry getEntry(String subject) throws JetStreamApiException, IOException { MessageInfo mi = jsm.getLastMessage(streamName, subject); return new CounterEntry(mi); diff --git a/counter/src/main/java/io/synadia/counter/CounterEntry.java b/counter/src/main/java/io/synadia/counter/CounterEntry.java index a547885..351d4bd 100644 --- a/counter/src/main/java/io/synadia/counter/CounterEntry.java +++ b/counter/src/main/java/io/synadia/counter/CounterEntry.java @@ -38,7 +38,7 @@ public class CounterEntry { } /** - * Whether this CounterEntry is a regular entry + * Whether this CounterEntry is a regular entry as opposed to an error/status * @return true if the CounterEntry is a regular entry */ public boolean isEntry() { diff --git a/counter/src/main/java/io/synadia/counter/CounterValue.java b/counter/src/main/java/io/synadia/counter/CounterValue.java new file mode 100644 index 0000000..474aefa --- /dev/null +++ b/counter/src/main/java/io/synadia/counter/CounterValue.java @@ -0,0 +1,92 @@ +// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.counter; + +import io.nats.client.api.MessageInfo; +import io.nats.client.support.Status; +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; + +import java.math.BigInteger; + +import static io.synadia.counter.CounterUtils.extractVal; + +public class CounterValue { + public final String subject; + public final BigInteger value; + public final Status status; + + CounterValue(MessageInfo mi) { + this.status = mi.getStatus(); + if (mi.isMessage()) { + this.subject = mi.getSubject(); + this.value = new BigInteger(extractVal(mi.getData())); + } + else { + this.subject = ""; + this.value = BigInteger.ZERO; + } + } + + /** + * Whether this CounterValue is a regular value as opposed to an error/status + * @return true if the CounterEntry is a regular value + */ + public boolean isValue() { + return status == null; + } + + /** + * Whether this CounterEntry is a status message + * @return true if this CounterEntry is a status message + */ + public boolean isStatus() { + return status != null; + } + + /** + * Whether this CounterEntry is a status message and is a direct EOB status + * @return true if this CounterEntry is a status message and is a direct EOB status + */ + public boolean isEobStatus() { + return status != null && status.isEob(); + } + + /** + * Whether this CounterEntry is a status message and is an error status + * @return true if this CounterEntry is a status message and is an error status + */ + public boolean isErrorStatus() { + return status != null && !status.isEob(); + } + + @NonNull + public String getSubject() { + return subject; + } + + @NonNull + public BigInteger getValue() { + return value; + } + + @Nullable + public Status getStatus() { + return status; + } + + @Override + public String toString() { + if (isValue()) { + return "CounterValue{" + + "subject='" + subject + '\'' + + ", value=" + value + + '}'; + } + + return "CounterValue{" + + "status=" + status + + '}'; + } +}