Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions counter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ repositories {

dependencies {
implementation 'io.nats:jnats:2.22.0.2_12-SNAPSHOT'
implementation 'io.synadia:direct-batch:0.1.2'
implementation 'io.synadia:direct-batch:0.1.3-SNAPSHOT'
implementation 'org.jspecify:jspecify:1.0.0'

testImplementation 'commons-codec:commons-codec:1.18.0'
Expand All @@ -49,6 +49,10 @@ dependencies {
testImplementation 'nl.jqno.equalsverifier:equalsverifier:3.12.3'
}

configurations.configureEach {
resolutionStrategy.cacheChangingModulesFor 0, 'seconds'
}

sourceSets {
main {
java {
Expand Down Expand Up @@ -204,4 +208,4 @@ if (isRelease) {
sign configurations.archives
sign publishing.publications.mavenJava
}
}
}
163 changes: 101 additions & 62 deletions counter/src/examples/java/io/synadia/examples/CounterContextExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.nats.client.api.StreamConfiguration;
import io.synadia.counter.CounterContext;
import io.synadia.counter.CounterEntry;
import io.synadia.counter.CounterValue;

import java.math.BigInteger;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -36,79 +37,117 @@ public static void main(String[] args) throws Exception {
.build());

System.out.println("1: Add to a subject...");
System.out.println("add(\"" + SUBJECT_A + "\", 1) -> " + counter.add(SUBJECT_A, 1));
System.out.println("add(\"" + SUBJECT_A + "\", 2) -> " + counter.add(SUBJECT_A, 2));
System.out.println("add(\"" + SUBJECT_A + "\", 3) -> " + counter.add(SUBJECT_A, 3));
System.out.println("add(\"" + SUBJECT_A + "\", -1) -> " + counter.add(SUBJECT_A, -1));

System.out.println("add(\"" + SUBJECT_B + "\", 10) -> " + counter.add(SUBJECT_B, 10));
System.out.println("add(\"" + SUBJECT_B + "\", 20) -> " + counter.add(SUBJECT_B, 20));
System.out.println("add(\"" + SUBJECT_B + "\", 30) -> " + counter.add(SUBJECT_B, 30));
System.out.println("add(\"" + SUBJECT_B + "\", -10) -> " + counter.add(SUBJECT_B, -10));

System.out.println("add(\"" + SUBJECT_C + "\", 100) -> " + counter.add(SUBJECT_C, 100));
System.out.println("add(\"" + SUBJECT_C + "\", 200) -> " + counter.add(SUBJECT_C, 200));
System.out.println("add(\"" + SUBJECT_C + "\", 300) -> " + counter.add(SUBJECT_C, 300));
System.out.println("add(\"" + SUBJECT_C + "\", -100) -> " + counter.add(SUBJECT_C, -100));

System.out.println("\n2.1: Get the value for existing subjects");
System.out.println("get(\"" + SUBJECT_A + "\") -> " + counter.get(SUBJECT_A));
System.out.println("get(\"" + SUBJECT_B + "\") -> " + counter.get(SUBJECT_B));
System.out.println("get(\"" + SUBJECT_C + "\") -> " + counter.get(SUBJECT_C));

System.out.println("\n2.2 Get the value if subject not found");
System.out.println(" add(\"" + SUBJECT_A + "\", 1) -> " + counter.add(SUBJECT_A, 1));
System.out.println(" add(\"" + SUBJECT_A + "\", 2) -> " + counter.add(SUBJECT_A, 2));
System.out.println(" add(\"" + SUBJECT_A + "\", 3) -> " + counter.add(SUBJECT_A, 3));
System.out.println(" add(\"" + SUBJECT_A + "\", -1) -> " + counter.add(SUBJECT_A, -1));

System.out.println(" add(\"" + SUBJECT_B + "\", 10) -> " + counter.add(SUBJECT_B, 10));
System.out.println(" add(\"" + SUBJECT_B + "\", 20) -> " + counter.add(SUBJECT_B, 20));
System.out.println(" add(\"" + SUBJECT_B + "\", 30) -> " + counter.add(SUBJECT_B, 30));
System.out.println(" add(\"" + SUBJECT_B + "\", -10) -> " + counter.add(SUBJECT_B, -10));

System.out.println(" add(\"" + SUBJECT_C + "\", 100) -> " + counter.add(SUBJECT_C, 100));
System.out.println(" add(\"" + SUBJECT_C + "\", 200) -> " + counter.add(SUBJECT_C, 200));
System.out.println(" add(\"" + SUBJECT_C + "\", 300) -> " + counter.add(SUBJECT_C, 300));
System.out.println(" add(\"" + SUBJECT_C + "\", -100) -> " + counter.add(SUBJECT_C, -100));

System.out.println("\n2.1: get() for existing subjects");
System.out.println(" get(\"" + SUBJECT_A + "\") -> " + counter.get(SUBJECT_A));
System.out.println(" get(\"" + SUBJECT_B + "\") -> " + counter.get(SUBJECT_B));
System.out.println(" get(\"" + SUBJECT_C + "\") -> " + counter.get(SUBJECT_C));

System.out.println("\n2.2: get() when the subject is not found");
try {
counter.get("cs.X");
counter.get("not-found");
}
catch (JetStreamApiException e) {
System.out.println("get(\"X\") -> " + e);
System.out.println(" get(\"not-found\") -> " + e);
}

System.out.println("\n3: Get the full entry for a subject, notice the last increment...");
System.out.println("getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A));
System.out.println("getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B));
System.out.println("getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C));
System.out.println("\n3: getEntry() - The full CounterEntry for a subject, notice the last increment...");
System.out.println(" getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A));
System.out.println(" getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B));
System.out.println(" getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C));

System.out.println("\n4: Get multiples entries - maybe to total them up");
LinkedBlockingQueue<CounterEntry> q = counter.getEntries(SUBJECT_A, SUBJECT_B, SUBJECT_C);
System.out.println("\n4.1: getValues() - Get the CounterValue object for multiple subjects. Maybe to total them up?\"");
LinkedBlockingQueue<CounterValue> qv = counter.getValues(SUBJECT_A, SUBJECT_B, SUBJECT_C);
BigInteger total = BigInteger.ZERO;
CounterEntry entry = q.poll(1, TimeUnit.SECONDS);
CounterValue value = qv.poll(1, TimeUnit.SECONDS);
while (value != null && value.isValue()) {
System.out.println(" " + value);
total = total.add(value.value);
value = qv.poll(10, TimeUnit.MILLISECONDS);
}
System.out.println(" The iteration is signaled done when the CounterValue is a status: " + value);
System.out.println(" Values totaled: " + total);

System.out.println("\n4.2: getEntries() - Get CounterEntry for multiple subjects.");
LinkedBlockingQueue<CounterEntry> qe = counter.getEntries(SUBJECT_A, SUBJECT_B, SUBJECT_C);
CounterEntry entry = qe.poll(1, TimeUnit.SECONDS);
while (entry != null && entry.isEntry()) {
System.out.println(" " + entry);
entry = qe.poll(10, TimeUnit.MILLISECONDS);
}
System.out.println(" CounterEntry status 204 signals no more entries: " + entry);

System.out.println("\n5.1: set() - Set the value for a subject");
System.out.println(" set(\"" + SUBJECT_A + "\", 9) -> " + counter.set(SUBJECT_A, 9));
System.out.println(" set(\"" + SUBJECT_B + "\", 99) -> " + counter.set(SUBJECT_B, 99));
System.out.println(" set(\"" + SUBJECT_C + "\", 999) -> " + counter.set(SUBJECT_C, 999));

System.out.println("\n5.2: getEntry() - Get the full CounterEntry, notice the last increment after a set represents" +
"\n the difference between the entry before the set and the set value.");
System.out.println(" getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A));
System.out.println(" getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B));
System.out.println(" getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C));

System.out.println("\n6.1: zero() is a shortcut to set the value of a subject to 0");
System.out.println(" zero(\"" + SUBJECT_A + "\") -> " + counter.zero(SUBJECT_A));
System.out.println(" zero(\"" + SUBJECT_B + "\") -> " + counter.zero(SUBJECT_B));
System.out.println(" zero(\"" + SUBJECT_C + "\") -> " + counter.zero(SUBJECT_C));

System.out.println("\n6.2: getEntry() - Get the full CounterEntry, notice the last increment after a zero represents" +
"\n the difference between the entry before the zero and zero.");
System.out.println(" getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A));
System.out.println(" getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B));
System.out.println(" getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C));

System.out.println("\n7.1: getValues() - Get multiple CounterValue - but no subjects have counters");
qv = counter.getValues("no-counters", "also-counters");
value = qv.poll(1, TimeUnit.SECONDS);
while (value != null && value.isValue()) {
System.out.println(" " + value);
value = qv.poll(10, TimeUnit.MILLISECONDS);
}
System.out.println(" CounterValue status 204 signals no more entries: " + value);

System.out.println("\n7.2: getEntries() - Get multiple CounterEntry - but no subjects have counters");
qe = counter.getEntries("no-counters", "also-counters");
entry = qe.poll(1, TimeUnit.SECONDS);
while (entry != null && entry.isEntry()) {
System.out.println("Entry: " + entry);
total = total.add(entry.value);
entry = q.poll(10, TimeUnit.MILLISECONDS);
System.out.println(" " + entry);
entry = qe.poll(10, TimeUnit.MILLISECONDS);
}
System.out.println("The last entry was: " + entry);
System.out.println("Entries Totaled: " + total);

System.out.println("\n5.1: Set the value for a subject");
System.out.println("set(\"" + SUBJECT_A + "\", 9) -> " + counter.set(SUBJECT_A, 9));
System.out.println("set(\"" + SUBJECT_B + "\", 99) -> " + counter.set(SUBJECT_B, 99));
System.out.println("set(\"" + SUBJECT_C + "\", 999) -> " + counter.set(SUBJECT_C, 999));

System.out.println("\n5.2: Get the full entry again, notice the last increment after a set...");
System.out.println("getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A));
System.out.println("getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B));
System.out.println("getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C));

System.out.println("\n6.1: Zero is a shortcut to set the value for a subject to 0");
System.out.println("zero(\"" + SUBJECT_A + "\") -> " + counter.zero(SUBJECT_A));
System.out.println("zero(\"" + SUBJECT_B + "\") -> " + counter.zero(SUBJECT_B));
System.out.println("zero(\"" + SUBJECT_C + "\") -> " + counter.zero(SUBJECT_C));

System.out.println("\n6.2: Get the full entry again, notice the last increment after a zero...");
System.out.println("getEntry(\"" + SUBJECT_A + "\") -> " + counter.getEntry(SUBJECT_A));
System.out.println("getEntry(\"" + SUBJECT_B + "\") -> " + counter.getEntry(SUBJECT_B));
System.out.println("getEntry(\"" + SUBJECT_C + "\") -> " + counter.getEntry(SUBJECT_C));

System.out.println("\n7: Get multiples entries - but subject doesn't have any counters");
q = counter.getEntries("not-no counters");
entry = q.poll(1, TimeUnit.SECONDS);
System.out.println(" The only CounterEntry received was a 404: " + entry);

System.out.println("\n8.1: getValues() - Get multiple CounterValue - some subjects have any counters");
qv = counter.getValues("no-counters", SUBJECT_A, SUBJECT_B, SUBJECT_C);
value = qv.poll(1, TimeUnit.SECONDS);
while (value != null && value.isValue()) {
System.out.println(" " + value);
value = qv.poll(10, TimeUnit.MILLISECONDS);
}
System.out.println(" CounterValue status 204 signals no more entries: " + value);

System.out.println("\n8.2: getEntries() - Get multiple CounterEntry - some subjects have any counters");
qe = counter.getEntries("no-counters", SUBJECT_A, SUBJECT_B, SUBJECT_C);
entry = qe.poll(1, TimeUnit.SECONDS);
while (entry != null && entry.isEntry()) {
System.out.println("Entry: " + entry);
entry = q.poll(10, TimeUnit.MILLISECONDS);
System.out.println(" " + entry);
entry = qe.poll(10, TimeUnit.MILLISECONDS);
}
System.out.println("The last entry was: " + entry);
System.out.println(" CounterEntry status 204 signals no more entries: " + entry);
}
}
}
50 changes: 42 additions & 8 deletions counter/src/main/java/io/synadia/counter/CounterContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import static io.nats.client.support.Validator.required;
import static io.synadia.counter.CounterUtils.INCREMENT_HEADER;
import static io.synadia.counter.CounterUtils.extractVal;

Expand All @@ -39,9 +40,9 @@ public static CounterContext createCounterStream(Connection conn, JetStreamOptio
.build();

JetStreamManagement jsm = conn.jetStreamManagement(jso);
jsm.addStream(config);
StreamInfo si = jsm.addStream(config);

return new CounterContext(config.getName(), conn, jso, jsm);
return new CounterContext(config.getName(), conn, jso, jsm, si);
}

private final String streamName;
Expand All @@ -50,18 +51,40 @@ public static CounterContext createCounterStream(Connection conn, JetStreamOptio
private final DirectBatchContext dbCtx;

public CounterContext(String streamName, Connection conn) throws IOException, JetStreamApiException {
this(streamName, conn, null, null);
this(streamName, conn, null, null, null);
}

public CounterContext(String streamName, Connection conn, JetStreamOptions jso) throws IOException, JetStreamApiException {
this(streamName, conn, jso, null);
this(streamName, conn, jso, null, null);
}

private CounterContext(@NonNull String streamName, @NonNull Connection conn, @Nullable JetStreamOptions jso, @Nullable JetStreamManagement jsm) throws IOException, JetStreamApiException {
this.streamName = streamName;
private CounterContext(@NonNull String streamName,
@NonNull Connection conn,
@Nullable JetStreamOptions jso,
@Nullable JetStreamManagement jsm,
@Nullable StreamInfo si
) throws IOException, JetStreamApiException
{
this.jsm = jsm == null ? conn.jetStreamManagement(jso) : jsm;
js = this.jsm.jetStream();
dbCtx = new DirectBatchContext(conn, jso, streamName);

if (si == null) {
this.streamName = required(streamName, "Stream name required,");
si = this.jsm.getStreamInfo(streamName);
}
else {
this.streamName = si.getConfiguration().getName();
}

if (!si.getConfiguration().getAllowDirect()) {
throw new IllegalArgumentException("Stream must have allow direct set.");
}

if (!si.getConfiguration().getAllowMessageCounter()) {
throw new IllegalArgumentException("Stream must have allow message counter set.");
}

dbCtx = new DirectBatchContext(conn, jso, streamName, si);
}

private BigInteger _add(String subject, String sv) throws IOException, JetStreamApiException {
Expand Down Expand Up @@ -114,10 +137,21 @@ public BigInteger zero(String subject) throws JetStreamApiException, IOException
}

public BigInteger get(String subject) throws JetStreamApiException, IOException {
MessageInfo mi = jsm.getLastMessage(streamName, subject);
MessageInfo mi = jsm.getMessage(streamName, MessageGetRequest.lastForSubject(subject).noHeaders());
return new BigInteger(extractVal(mi.getData()));
}

public LinkedBlockingQueue<CounterValue> getValues(String... subjects) {
return getValues(Arrays.asList(subjects));
}

public LinkedBlockingQueue<CounterValue> getValues(List<String> subjects) {
LinkedBlockingQueue<CounterValue> queue = new LinkedBlockingQueue<>();
MessageBatchGetRequest mbgr = MessageBatchGetRequest.multiLastForSubjects(subjects);
dbCtx.requestMessageBatch(mbgr, mi -> queue.add(new CounterValue(mi)));
return queue;
}

public CounterEntry getEntry(String subject) throws JetStreamApiException, IOException {
MessageInfo mi = jsm.getLastMessage(streamName, subject);
return new CounterEntry(mi);
Expand Down
2 changes: 1 addition & 1 deletion counter/src/main/java/io/synadia/counter/CounterEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class CounterEntry {
}

/**
* Whether this CounterEntry is a regular entry
* Whether this CounterEntry is a regular entry as opposed to an error/status
* @return true if the CounterEntry is a regular entry
*/
public boolean isEntry() {
Expand Down
Loading
Loading