diff --git a/.github/workflows/pcg-main.yml b/.github/workflows/pcg-main.yml
index 9f30756..0292b94 100644
--- a/.github/workflows/pcg-main.yml
+++ b/.github/workflows/pcg-main.yml
@@ -9,7 +9,7 @@ on:
jobs:
build:
- uses: ./.github/workflows/workflow-main.yml
+ uses: synadia-io/workflows/.github/workflows/java-standard-main.yml@main
with:
project-dir: pcgroups
secrets: inherit
diff --git a/.github/workflows/pcg-pr.yml b/.github/workflows/pcg-pr.yml
index 0d2c8ff..8032301 100644
--- a/.github/workflows/pcg-pr.yml
+++ b/.github/workflows/pcg-pr.yml
@@ -8,7 +8,7 @@ on:
jobs:
build:
- uses: ./.github/workflows/workflow-pr.yml
+ uses: synadia-io/workflows/.github/workflows/java-standard-pr.yml@main
with:
project-dir: pcgroups
secrets: inherit
diff --git a/.github/workflows/pcg-release.yml b/.github/workflows/pcg-release.yml
index 2f64eda..97ea73f 100644
--- a/.github/workflows/pcg-release.yml
+++ b/.github/workflows/pcg-release.yml
@@ -6,7 +6,7 @@ on:
jobs:
build:
- uses: ./.github/workflows/workflow-release.yml
+ uses: synadia-io/workflows/.github/workflows/java-standard-release.yml@main
with:
project-dir: pcgroups
secrets: inherit
diff --git a/.github/workflows/pcgcli-main.yml b/.github/workflows/pcgcli-main.yml
new file mode 100644
index 0000000..dfb6d30
--- /dev/null
+++ b/.github/workflows/pcgcli-main.yml
@@ -0,0 +1,46 @@
+name: Partitioned Consumer Groups Main Snapshot
+
+on:
+ push:
+ branches:
+ - main
+ paths:
+ - 'pcgroups-cli/**'
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ defaults:
+ run:
+ working-directory: ./pcgroups-cli
+ steps:
+ - name: Set up JDK
+ uses: actions/setup-java@v5
+ with:
+ java-version: '21'
+ distribution: 'temurin'
+ - name: Check out code
+ uses: actions/checkout@v4
+ - name: Build with Gradle
+ run: |
+ chmod +x gradlew && ./gradlew clean test uberJar
+ - name: Build with Maven
+ run: mvn clean package
+ - name: Validate Gradle Uber was created
+ run: |
+ FILE_PATH="build/libs/cg.jar"
+ if [ -f "$FILE_PATH" ]; then
+ echo "Validation successful: $FILE_PATH was created."
+ else
+ echo "Validation failed: $FILE_PATH was not found."
+ exit 1 # Fails the workflow step
+ fi
+ - name: Validate Maven Uber was created
+ run: |
+ FILE_PATH="target/cg.jar"
+ if [ -f "$FILE_PATH" ]; then
+ echo "Validation successful: $FILE_PATH was created."
+ else
+ echo "Validation failed: $FILE_PATH was not found."
+ exit 1 # Fails the workflow step
+ fi
diff --git a/.github/workflows/pcgcli-pr.yml b/.github/workflows/pcgcli-pr.yml
new file mode 100644
index 0000000..d5d243a
--- /dev/null
+++ b/.github/workflows/pcgcli-pr.yml
@@ -0,0 +1,45 @@
+name: Partitioned Consumer Groups Pull Request
+
+on:
+ pull_request:
+ types: [opened, synchronize, reopened]
+ paths:
+ - 'pcgroups-cli/**'
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ defaults:
+ run:
+ working-directory: ./pcgroups-cli
+ steps:
+ - name: Set up JDK
+ uses: actions/setup-java@v5
+ with:
+ java-version: '21'
+ distribution: 'temurin'
+ - name: Check out code
+ uses: actions/checkout@v4
+ - name: Build with Gradle
+ run: |
+ chmod +x gradlew && ./gradlew clean test uberJar
+ - name: Build with Maven
+ run: mvn clean package
+ - name: Validate Gradle Uber was created
+ run: |
+ FILE_PATH="build/libs/cg.jar"
+ if [ -f "$FILE_PATH" ]; then
+ echo "Validation successful: $FILE_PATH was created."
+ else
+ echo "Validation failed: $FILE_PATH was not found."
+ exit 1 # Fails the workflow step
+ fi
+ - name: Validate Maven Uber was created
+ run: |
+ FILE_PATH="target/cg.jar"
+ if [ -f "$FILE_PATH" ]; then
+ echo "Validation successful: $FILE_PATH was created."
+ else
+ echo "Validation failed: $FILE_PATH was not found."
+ exit 1 # Fails the workflow step
+ fi
diff --git a/pcgroups-cli/build.gradle b/pcgroups-cli/build.gradle
new file mode 100644
index 0000000..93a4161
--- /dev/null
+++ b/pcgroups-cli/build.gradle
@@ -0,0 +1,71 @@
+plugins {
+ id("java")
+ id("java-library")
+ id("maven-publish")
+ id("jacoco")
+ id("biz.aQute.bnd.builder") version "7.1.0"
+ id("org.gradle.test-retry") version "1.6.4"
+ id("io.github.gradle-nexus.publish-plugin") version "2.0.0"
+ id("signing")
+}
+
+group = 'io.synadia'
+version = "0.1.0"
+def originalUber = 'pcg-cli-' + version + '-uber.jar'
+System.out.println(originalUber)
+
+java {
+ sourceCompatibility = JavaVersion.VERSION_1_8
+}
+
+repositories {
+ mavenCentral()
+ mavenLocal()
+ maven { url="https://repo1.maven.org/maven2/" }
+ maven { url="https://central.sonatype.com/repository/maven-snapshots" }
+}
+
+dependencies {
+ implementation 'io.nats:jnats:2.25.1'
+ implementation 'org.jspecify:jspecify:1.0.0'
+ implementation 'io.synadia:pcgroups:0.1.0-SNAPSHOT'
+ implementation 'info.picocli:picocli:4.7.5'
+
+ testImplementation 'io.nats:jnats-server-runner:3.1.0'
+ testImplementation 'org.junit.jupiter:junit-jupiter:5.14.1'
+ testImplementation 'org.junit.platform:junit-platform-launcher:1.14.3'
+}
+
+tasks.register('copyToLib', Copy) {
+ into "build/libs"
+ from configurations.runtimeClasspath
+}
+
+tasks.register('packageUberJar', Jar) {
+ archiveClassifier = 'uber'
+
+ from sourceSets.main.output
+
+ dependsOn configurations.runtimeClasspath
+ from {
+ configurations.runtimeClasspath.findAll { it.name.endsWith('jar') }.collect { zipTree(it) }
+ }
+
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+ dependsOn copyToLib
+}
+
+tasks.register('uberJar', Copy) {
+ dependsOn packageUberJar
+
+ // we want the file to be called cg.jar
+ from ('build/libs')
+ include originalUber
+ destinationDir file('build/libs/')
+ rename originalUber, "cg.jar"
+
+ // this task actually does a copy, so this part removes the original
+ doLast {
+ delete('build/libs/' + originalUber)
+ }
+}
diff --git a/pcgroups-cli/gradle/libs.versions.toml b/pcgroups-cli/gradle/libs.versions.toml
new file mode 100644
index 0000000..2cfe86a
--- /dev/null
+++ b/pcgroups-cli/gradle/libs.versions.toml
@@ -0,0 +1,12 @@
+# This file was generated by the Gradle 'init' task.
+# https://docs.gradle.org/current/userguide/platforms.html#sub::toml-dependencies-format
+
+[versions]
+commons-math3 = "3.6.1"
+guava = "33.4.5-jre"
+junit-jupiter = "5.12.1"
+
+[libraries]
+commons-math3 = { module = "org.apache.commons:commons-math3", version.ref = "commons-math3" }
+guava = { module = "com.google.guava:guava", version.ref = "guava" }
+junit-jupiter = { module = "org.junit.jupiter:junit-jupiter", version.ref = "junit-jupiter" }
diff --git a/pcgroups-cli/gradle/wrapper/gradle-wrapper.jar b/pcgroups-cli/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 0000000..1b33c55
Binary files /dev/null and b/pcgroups-cli/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/pcgroups-cli/gradle/wrapper/gradle-wrapper.properties b/pcgroups-cli/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 0000000..ca025c8
--- /dev/null
+++ b/pcgroups-cli/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,7 @@
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
+networkTimeout=10000
+validateDistributionUrl=true
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
diff --git a/pcgroups-cli/gradlew b/pcgroups-cli/gradlew
new file mode 100644
index 0000000..23d15a9
--- /dev/null
+++ b/pcgroups-cli/gradlew
@@ -0,0 +1,251 @@
+#!/bin/sh
+
+#
+# Copyright © 2015-2021 the original authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+#
+
+##############################################################################
+#
+# Gradle start up script for POSIX generated by Gradle.
+#
+# Important for running:
+#
+# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
+# noncompliant, but you have some other compliant shell such as ksh or
+# bash, then to run this script, type that shell name before the whole
+# command line, like:
+#
+# ksh Gradle
+#
+# Busybox and similar reduced shells will NOT work, because this script
+# requires all of these POSIX shell features:
+# * functions;
+# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
+# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
+# * compound commands having a testable exit status, especially «case»;
+# * various built-in commands including «command», «set», and «ulimit».
+#
+# Important for patching:
+#
+# (2) This script targets any POSIX shell, so it avoids extensions provided
+# by Bash, Ksh, etc; in particular arrays are avoided.
+#
+# The "traditional" practice of packing multiple parameters into a
+# space-separated string is a well documented source of bugs and security
+# problems, so this is (mostly) avoided, by progressively accumulating
+# options in "$@", and eventually passing that to Java.
+#
+# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
+# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
+# see the in-line comments for details.
+#
+# There are tweaks for specific operating systems such as AIX, CygWin,
+# Darwin, MinGW, and NonStop.
+#
+# (3) This script is generated from the Groovy template
+# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+# within the Gradle project.
+#
+# You can find Gradle at https://github.com/gradle/gradle/.
+#
+##############################################################################
+
+# Attempt to set APP_HOME
+
+# Resolve links: $0 may be a link
+app_path=$0
+
+# Need this for daisy-chained symlinks.
+while
+ APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
+ [ -h "$app_path" ]
+do
+ ls=$( ls -ld "$app_path" )
+ link=${ls#*' -> '}
+ case $link in #(
+ /*) app_path=$link ;; #(
+ *) app_path=$APP_HOME$link ;;
+ esac
+done
+
+# This is normally unused
+# shellcheck disable=SC2034
+APP_BASE_NAME=${0##*/}
+# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
+APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD=maximum
+
+warn () {
+ echo "$*"
+} >&2
+
+die () {
+ echo
+ echo "$*"
+ echo
+ exit 1
+} >&2
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+nonstop=false
+case "$( uname )" in #(
+ CYGWIN* ) cygwin=true ;; #(
+ Darwin* ) darwin=true ;; #(
+ MSYS* | MINGW* ) msys=true ;; #(
+ NONSTOP* ) nonstop=true ;;
+esac
+
+CLASSPATH="\\\"\\\""
+
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD=$JAVA_HOME/jre/sh/java
+ else
+ JAVACMD=$JAVA_HOME/bin/java
+ fi
+ if [ ! -x "$JAVACMD" ] ; then
+ die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+else
+ JAVACMD=java
+ if ! command -v java >/dev/null 2>&1
+ then
+ die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+fi
+
+# Increase the maximum file descriptors if we can.
+if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
+ case $MAX_FD in #(
+ max*)
+ # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC2039,SC3045
+ MAX_FD=$( ulimit -H -n ) ||
+ warn "Could not query maximum file descriptor limit"
+ esac
+ case $MAX_FD in #(
+ '' | soft) :;; #(
+ *)
+ # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC2039,SC3045
+ ulimit -n "$MAX_FD" ||
+ warn "Could not set maximum file descriptor limit to $MAX_FD"
+ esac
+fi
+
+# Collect all arguments for the java command, stacking in reverse order:
+# * args from the command line
+# * the main class name
+# * -classpath
+# * -D...appname settings
+# * --module-path (only if needed)
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
+
+# For Cygwin or MSYS, switch paths to Windows format before running java
+if "$cygwin" || "$msys" ; then
+ APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
+ CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
+
+ JAVACMD=$( cygpath --unix "$JAVACMD" )
+
+ # Now convert the arguments - kludge to limit ourselves to /bin/sh
+ for arg do
+ if
+ case $arg in #(
+ -*) false ;; # don't mess with options #(
+ /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
+ [ -e "$t" ] ;; #(
+ *) false ;;
+ esac
+ then
+ arg=$( cygpath --path --ignore --mixed "$arg" )
+ fi
+ # Roll the args list around exactly as many times as the number of
+ # args, so each arg winds up back in the position where it started, but
+ # possibly modified.
+ #
+ # NB: a `for` loop captures its iteration list before it begins, so
+ # changing the positional parameters here affects neither the number of
+ # iterations, nor the values presented in `arg`.
+ shift # remove old arg
+ set -- "$@" "$arg" # push replacement arg
+ done
+fi
+
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
+
+# Collect all arguments for the java command:
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
+# and any embedded shellness will be escaped.
+# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
+# treated as '${Hostname}' itself on the command line.
+
+set -- \
+ "-Dorg.gradle.appname=$APP_BASE_NAME" \
+ -classpath "$CLASSPATH" \
+ -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
+ "$@"
+
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+ die "xargs is not available"
+fi
+
+# Use "xargs" to parse quoted args.
+#
+# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
+#
+# In Bash we could simply go:
+#
+# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
+# set -- "${ARGS[@]}" "$@"
+#
+# but POSIX shell has neither arrays nor command substitution, so instead we
+# post-process each arg (as a line of input to sed) to backslash-escape any
+# character that might be a shell metacharacter, then use eval to reverse
+# that process (while maintaining the separation between arguments), and wrap
+# the whole thing up as a single "set" statement.
+#
+# This will of course break if any of these variables contains a newline or
+# an unmatched quote.
+#
+
+eval "set -- $(
+ printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
+ xargs -n1 |
+ sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
+ tr '\n' ' '
+ )" '"$@"'
+
+exec "$JAVACMD" "$@"
diff --git a/pcgroups-cli/gradlew.bat b/pcgroups-cli/gradlew.bat
new file mode 100644
index 0000000..db3a6ac
--- /dev/null
+++ b/pcgroups-cli/gradlew.bat
@@ -0,0 +1,94 @@
+@rem
+@rem Copyright 2015 the original author or authors.
+@rem
+@rem Licensed under the Apache License, Version 2.0 (the "License");
+@rem you may not use this file except in compliance with the License.
+@rem You may obtain a copy of the License at
+@rem
+@rem https://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+@rem
+@rem SPDX-License-Identifier: Apache-2.0
+@rem
+
+@if "%DEBUG%"=="" @echo off
+@rem ##########################################################################
+@rem
+@rem Gradle startup script for Windows
+@rem
+@rem ##########################################################################
+
+@rem Set local scope for the variables with windows NT shell
+if "%OS%"=="Windows_NT" setlocal
+
+set DIRNAME=%~dp0
+if "%DIRNAME%"=="" set DIRNAME=.
+@rem This is normally unused
+set APP_BASE_NAME=%~n0
+set APP_HOME=%DIRNAME%
+
+@rem Resolve any "." and ".." in APP_HOME to make it shorter.
+for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
+
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
+
+@rem Find java.exe
+if defined JAVA_HOME goto findJavaFromJavaHome
+
+set JAVA_EXE=java.exe
+%JAVA_EXE% -version >NUL 2>&1
+if %ERRORLEVEL% equ 0 goto execute
+
+echo. 1>&2
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2
+echo. 1>&2
+echo Please set the JAVA_HOME variable in your environment to match the 1>&2
+echo location of your Java installation. 1>&2
+
+goto fail
+
+:findJavaFromJavaHome
+set JAVA_HOME=%JAVA_HOME:"=%
+set JAVA_EXE=%JAVA_HOME%/bin/java.exe
+
+if exist "%JAVA_EXE%" goto execute
+
+echo. 1>&2
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2
+echo. 1>&2
+echo Please set the JAVA_HOME variable in your environment to match the 1>&2
+echo location of your Java installation. 1>&2
+
+goto fail
+
+:execute
+@rem Setup the command line
+
+set CLASSPATH=
+
+
+@rem Execute Gradle
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %*
+
+:end
+@rem End local scope for the variables with windows NT shell
+if %ERRORLEVEL% equ 0 goto mainEnd
+
+:fail
+rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
+rem the _cmd.exe /c_ return code!
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
+
+:mainEnd
+if "%OS%"=="Windows_NT" endlocal
+
+:omega
diff --git a/pcgroups/cli/pom.xml b/pcgroups-cli/pom.xml
similarity index 90%
rename from pcgroups/cli/pom.xml
rename to pcgroups-cli/pom.xml
index f31a38f..9091eaf 100644
--- a/pcgroups/cli/pom.xml
+++ b/pcgroups-cli/pom.xml
@@ -19,7 +19,7 @@
4.0.0
io.synadia
- cg-cli
+ pcg-cli
0.1.0
jar
@@ -27,20 +27,19 @@
CLI tool for managing NATS JetStream Partitioned Consumer Groups
- 11
- 11
+ 1.8
UTF-8
+ 0.1.0-SNAPSHOT
4.7.5
2.25.1
- 2.11.0
io.synadia
- partitioned-consumer-groups
- 0.1.0
+ pcgroups
+ ${pcgroups.version}
@@ -50,13 +49,6 @@
${jnats.version}
-
-
- com.google.code.gson
- gson
- ${gson.version}
-
-
info.picocli
diff --git a/pcgroups-cli/settings.gradle b/pcgroups-cli/settings.gradle
new file mode 100644
index 0000000..ce87931
--- /dev/null
+++ b/pcgroups-cli/settings.gradle
@@ -0,0 +1,13 @@
+pluginManagement {
+ repositories {
+ gradlePluginPortal()
+ mavenCentral()
+ maven { url="https://repo1.maven.org/maven2/" }
+ maven { url="https://central.sonatype.com/repository/maven-snapshots/" }
+ maven { url="https://plugins.gradle.org/m2/" }
+ }
+ plugins {
+ id("biz.aQute.bnd.builder") version "7.1.0"
+ }
+}
+rootProject.name = 'pcg-cli'
diff --git a/pcgroups/cli/src/main/java/io/synadia/pcg/cli/CgCommand.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/CgCommand.java
similarity index 100%
rename from pcgroups/cli/src/main/java/io/synadia/pcg/cli/CgCommand.java
rename to pcgroups-cli/src/main/java/io/synadia/pcg/cli/CgCommand.java
diff --git a/pcgroups/cli/src/main/java/io/synadia/pcg/cli/CliUtils.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/CliUtils.java
similarity index 100%
rename from pcgroups/cli/src/main/java/io/synadia/pcg/cli/CliUtils.java
rename to pcgroups-cli/src/main/java/io/synadia/pcg/cli/CliUtils.java
diff --git a/pcgroups/cli/src/main/java/io/synadia/pcg/cli/DurationConverter.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/DurationConverter.java
similarity index 100%
rename from pcgroups/cli/src/main/java/io/synadia/pcg/cli/DurationConverter.java
rename to pcgroups-cli/src/main/java/io/synadia/pcg/cli/DurationConverter.java
diff --git a/pcgroups/cli/src/main/java/io/synadia/pcg/cli/ElasticCommands.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/ElasticCommands.java
similarity index 100%
rename from pcgroups/cli/src/main/java/io/synadia/pcg/cli/ElasticCommands.java
rename to pcgroups-cli/src/main/java/io/synadia/pcg/cli/ElasticCommands.java
diff --git a/pcgroups/cli/src/main/java/io/synadia/pcg/cli/PromptHandler.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/PromptHandler.java
similarity index 100%
rename from pcgroups/cli/src/main/java/io/synadia/pcg/cli/PromptHandler.java
rename to pcgroups-cli/src/main/java/io/synadia/pcg/cli/PromptHandler.java
diff --git a/pcgroups/cli/src/main/java/io/synadia/pcg/cli/StaticCommands.java b/pcgroups-cli/src/main/java/io/synadia/pcg/cli/StaticCommands.java
similarity index 100%
rename from pcgroups/cli/src/main/java/io/synadia/pcg/cli/StaticCommands.java
rename to pcgroups-cli/src/main/java/io/synadia/pcg/cli/StaticCommands.java
diff --git a/pcgroups/LICENSE b/pcgroups/LICENSE
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/pcgroups/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/pcgroups/NOTICE b/pcgroups/NOTICE
new file mode 100644
index 0000000..bd2b8ad
--- /dev/null
+++ b/pcgroups/NOTICE
@@ -0,0 +1,5 @@
+Orbit Java
+Copyright (c) 2024-2025 Synadia Communications Inc. All Rights Reserved.
+
+This product includes software developed at
+Synadia Communications Inc.
diff --git a/pcgroups/README.md b/pcgroups/README.md
index 8eba332..331021d 100644
--- a/pcgroups/README.md
+++ b/pcgroups/README.md
@@ -1,9 +1,6 @@
-# Partitioned Consumer Groups
-
-[License-Url]: https://www.apache.org/licenses/LICENSE-2.0
-[License-Image]: https://img.shields.io/badge/License-Apache2-blue.svg
+
-[![License][License-Image]][License-Url]
+# Partitioned Consumer Groups
Initial implementation of a client-side partitioned consumer group feature for NATS streams leveraging some of the new features introduced in `nats-server` version 2.11.
@@ -101,3 +98,7 @@ You can look at the `cg` CLI tool's source code for examples of how to create an
# Requirements
Partitioned consumer groups require NATS server version 2.11 or above.
+
+---
+Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved.
+See [LICENSE](LICENSE) and [NOTICE](NOTICE) file for details.
diff --git a/pcgroups/build.gradle b/pcgroups/build.gradle
index 383c6fe..75f3332 100644
--- a/pcgroups/build.gradle
+++ b/pcgroups/build.gradle
@@ -19,7 +19,7 @@ def isRelease = System.getenv("BUILD_EVENT") == "release"
def tc = System.getenv("TARGET_COMPATIBILITY");
def targetCompat = tc == "21" ? JavaVersion.VERSION_21 : (tc == "17" ? JavaVersion.VERSION_17 : JavaVersion.VERSION_1_8)
def jarEnd = tc == "21" ? "-jdk21" : (tc == "17" ? "-jdk17" : "")
-def jarAndArtifactName = "partitioned-consumer-groups" + jarEnd
+def jarAndArtifactName = "pcgroups" + jarEnd
version = isRelease ? jarVersion : jarVersion + "-SNAPSHOT" // version is the variable the build actually uses.
@@ -35,28 +35,14 @@ repositories {
}
dependencies {
- implementation 'io.nats:jnats:2.25.1'
- implementation 'com.google.code.gson:gson:2.11.0'
- implementation 'org.jspecify:jspecify:1.0.0'
+ api 'io.nats:jnats:2.25.1'
+ api 'org.jspecify:jspecify:1.0.0'
testImplementation 'io.nats:jnats-server-runner:3.1.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.14.1'
testImplementation 'org.junit.platform:junit-platform-launcher:1.14.3'
}
-sourceSets {
- main {
- java {
- srcDirs = ['src/main/java']
- }
- }
- test {
- java {
- srcDirs = ['src/test/java']
- }
- }
-}
-
tasks.register('bundle', Bundle) {
from sourceSets.main.output
}
@@ -66,28 +52,18 @@ jar {
bnd("Bundle-Name": "io.synadia.partitioned.consumer.groups",
"Bundle-Vendor": "synadia.io",
"Bundle-Description": "NATS JetStream Partitioned Consumer Groups Library for Java",
- "Bundle-DocURL": "https://synadia.io"
+ "Bundle-DocURL": "https://github.com/synadia-io/orbit.java/tree/main/pcgroups",
+ "Target-Compatibility": "Java " + targetCompat
)
}
}
test {
- // Use junit platform for unit tests
useJUnitPlatform()
- testLogging {
- exceptionFormat = 'full'
- events "started", "passed", "skipped", "failed"
- showStandardStreams = true
- }
- retry {
- failOnPassedAfterRetry = false
- maxFailures = 3
- maxRetries = 3
- }
- systemProperty 'junit.jupiter.execution.timeout.default', '3m'
}
javadoc {
+ options.overview = 'src/main/javadoc/overview.html' // relative to source root
source = sourceSets.main.allJava
title = "Synadia Communications Inc. NATS JetStream Partitioned Consumer Groups"
classpath = sourceSets.main.runtimeClasspath
@@ -103,13 +79,8 @@ tasks.register('sourcesJar', Jar) {
from sourceSets.main.allSource
}
-tasks.register('testsJar', Jar) {
- archiveClassifier.set('tests')
- from sourceSets.test.allSource
-}
-
artifacts {
- archives javadocJar, sourcesJar, testsJar
+ archives javadocJar, sourcesJar
}
jacoco {
@@ -146,10 +117,9 @@ publishing {
from components.java
artifact sourcesJar
artifact javadocJar
- artifact testsJar
pom {
name = jarAndArtifactName
- packaging = "jar"
+ packaging = 'jar'
groupId = group
artifactId = jarAndArtifactName
description = "Synadia Communications Inc. NATS JetStream Partitioned Consumer Groups"
diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java
index b26a668..a798323 100644
--- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java
+++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java
@@ -13,15 +13,12 @@
package io.synadia.pcg;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import io.nats.client.*;
import io.nats.client.api.*;
import io.nats.client.impl.Headers;
import io.synadia.pcg.exceptions.ConsumerGroupException;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@@ -41,7 +38,6 @@
public class ElasticConsumerGroup {
private static final Logger LOGGER = Logger.getLogger(ElasticConsumerGroup.class.getName());
- private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().create();
private ElasticConsumerGroup() {
// Utility class
@@ -56,17 +52,17 @@ private ElasticConsumerGroup() {
* @param maxMembers Maximum number of members (partitions)
* @param filter Subject filter with wildcards
* @param partitioningWildcards Indexes of wildcards to use for partitioning
- * @param maxBufferedMsgs Max messages in work queue (0 for unlimited)
+ * @param maxBufferedMessages Max messages in work queue (0 for unlimited)
* @param maxBufferedBytes Max bytes in work queue (0 for unlimited)
* @return The created configuration
*/
public static ElasticConsumerGroupConfig create(Connection nc, String streamName, String consumerGroupName,
int maxMembers, String filter, int[] partitioningWildcards,
- long maxBufferedMsgs, long maxBufferedBytes)
+ long maxBufferedMessages, long maxBufferedBytes)
throws ConsumerGroupException, IOException, JetStreamApiException, InterruptedException {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
- maxMembers, filter, partitioningWildcards, maxBufferedMsgs, maxBufferedBytes,
+ maxMembers, filter, partitioningWildcards, maxBufferedMessages, maxBufferedBytes,
new ArrayList<>(), new ArrayList<>());
config.validate();
@@ -96,27 +92,22 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName
String key = composeKey(streamName, consumerGroupName);
// Check if config already exists
- KeyValueEntry entry = kv.get(key);
- if (entry != null) {
- String json = new String(entry.getValue(), StandardCharsets.UTF_8);
- ElasticConsumerGroupConfig existingConfig = GSON.fromJson(json, ElasticConsumerGroupConfig.class);
-
- // Verify the config matches
+ ElasticConsumerGroupConfig existingConfig = ElasticConsumerGroupConfig.instance(kv.get(key));
+ if (existingConfig != null) {
if (existingConfig.getMaxMembers() != maxMembers ||
- !Objects.equals(existingConfig.getFilter(), filter) ||
- existingConfig.getMaxBufferedMsgs() != maxBufferedMsgs ||
- existingConfig.getMaxBufferedBytes() != maxBufferedBytes ||
- !Arrays.equals(existingConfig.getPartitioningWildcards(), partitioningWildcards)) {
+ !Objects.equals(existingConfig.getFilter(), filter) ||
+ existingConfig.getMaxBufferedMessages() != maxBufferedMessages ||
+ existingConfig.getMaxBufferedBytes() != maxBufferedBytes ||
+ !Arrays.equals(existingConfig.getPartitioningWildcards(), partitioningWildcards)) {
throw new ConsumerGroupException(
- "the existing elastic consumer group config can not be updated to the requested one, " +
- "please delete the existing elastic consumer group and create a new one");
+ "the existing elastic consumer group config can not be updated to the requested one, " +
+ "please delete the existing elastic consumer group and create a new one");
}
return existingConfig;
}
// Create the config entry
- String payload = GSON.toJson(config);
- kv.put(key, payload.getBytes(StandardCharsets.UTF_8));
+ kv.put(key, config.serialize());
// Create the work queue stream with subject transform
String workQueueStreamName = composeCGSName(streamName, consumerGroupName);
@@ -130,8 +121,8 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName
.discardPolicy(DiscardPolicy.New)
.allowDirect(true);
- if (maxBufferedMsgs > 0) {
- scBuilder.maxMessages(maxBufferedMsgs);
+ if (maxBufferedMessages > 0) {
+ scBuilder.maxMessages(maxBufferedMessages);
}
if (maxBufferedBytes > 0) {
scBuilder.maxBytes(maxBufferedBytes);
@@ -305,9 +296,8 @@ public static List addMembers(Connection nc, String streamName, String c
List newMembers = new ArrayList<>(existingMembers);
config.setMembers(newMembers);
- String payload = GSON.toJson(config);
String key = composeKey(streamName, consumerGroupName);
- kv.update(key, payload.getBytes(StandardCharsets.UTF_8), config.getRevision());
+ kv.update(key, config.serialize(), config.getRevision());
return newMembers;
}
@@ -350,9 +340,8 @@ public static List deleteMembers(Connection nc, String streamName, Strin
config.setMembers(newMembers);
- String payload = GSON.toJson(config);
String key = composeKey(streamName, consumerGroupName);
- kv.update(key, payload.getBytes(StandardCharsets.UTF_8), config.getRevision());
+ kv.update(key, config.serialize(), config.getRevision());
return newMembers;
}
@@ -382,9 +371,8 @@ public static void setMemberMappings(Connection nc, String streamName, String co
config.setMemberMappings(memberMappings);
config.validate();
- String payload = GSON.toJson(config);
String key = composeKey(streamName, consumerGroupName);
- kv.put(key, payload.getBytes(StandardCharsets.UTF_8));
+ kv.put(key, config.serialize());
}
/**
@@ -408,9 +396,8 @@ public static void deleteMemberMappings(Connection nc, String streamName, String
config.setMemberMappings(new ArrayList<>());
- String payload = GSON.toJson(config);
String key = composeKey(streamName, consumerGroupName);
- kv.put(key, payload.getBytes(StandardCharsets.UTF_8));
+ kv.put(key, config.serialize());
}
/**
@@ -538,17 +525,11 @@ private static ElasticConsumerGroupConfig getConfigFromKV(KeyValue kv, String st
}
String key = composeKey(streamName, consumerGroupName);
- KeyValueEntry entry = kv.get(key);
-
- if (entry == null) {
+ ElasticConsumerGroupConfig config = ElasticConsumerGroupConfig.instance(kv.get(key));
+ if (config == null) {
throw new ConsumerGroupException("error getting the elastic consumer group's config: not found");
}
-
- String json = new String(entry.getValue(), StandardCharsets.UTF_8);
- ElasticConsumerGroupConfig config = GSON.fromJson(json, ElasticConsumerGroupConfig.class);
- config.setRevision(entry.getRevision());
config.validate();
-
return config;
}
@@ -730,14 +711,13 @@ private void processWatcherUpdate(KeyValueEntry entry) {
}
try {
- String json = new String(entry.getValue(), StandardCharsets.UTF_8);
- ElasticConsumerGroupConfig newConfig = GSON.fromJson(json, ElasticConsumerGroupConfig.class);
+ ElasticConsumerGroupConfig newConfig = ElasticConsumerGroupConfig.instance(entry);
newConfig.validate();
// Check if critical config changed (immutable fields)
if (newConfig.getMaxMembers() != config.getMaxMembers() ||
!Objects.equals(newConfig.getFilter(), config.getFilter()) ||
- newConfig.getMaxBufferedMsgs() != config.getMaxBufferedMsgs() ||
+ newConfig.getMaxBufferedMessages() != config.getMaxBufferedMessages() ||
newConfig.getMaxBufferedBytes() != config.getMaxBufferedBytes() ||
!Arrays.equals(newConfig.getPartitioningWildcards(), config.getPartitioningWildcards())) {
stopConsuming();
diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java
index 29e3c21..8c66dc2 100644
--- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java
+++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java
@@ -13,41 +13,59 @@
package io.synadia.pcg;
-import com.google.gson.annotations.SerializedName;
+import io.nats.client.api.KeyValueEntry;
+import io.nats.client.support.*;
import io.synadia.pcg.exceptions.ConsumerGroupException;
+import org.jspecify.annotations.NonNull;
+import org.jspecify.annotations.Nullable;
import java.util.*;
+import static io.nats.client.support.JsonUtils.*;
+import static io.nats.client.support.JsonValueUtils.*;
+
/**
* Configuration for an elastic consumer group.
* JSON structure must be compatible with the Go version.
*/
-public class ElasticConsumerGroupConfig {
+public class ElasticConsumerGroupConfig implements JsonSerializable {
+ static final String MAX_MEMBERS = "max_members";
+ static final String FILTER = "filter";
+ static final String PARTITIONING_WILDCARDS = "partitioning_wildcards";
+ static final String MAX_BUFFERED_MSG = "max_buffered_msg";
+ static final String MAX_BUFFERED_BYTES = "max_buffered_bytes";
+ static final String MEMBERS = "members";
+ static final String MEMBER_MAPPINGS = "member_mappings";
- @SerializedName("max_members")
private int maxMembers;
-
- @SerializedName("filter")
private String filter;
-
- @SerializedName("partitioning_wildcards")
private int[] partitioningWildcards;
-
- @SerializedName("max_buffered_msg")
- private long maxBufferedMsgs;
-
- @SerializedName("max_buffered_bytes")
+ private long maxBufferedMessages;
private long maxBufferedBytes;
-
- @SerializedName("members")
private List members;
-
- @SerializedName("member_mappings")
private List memberMappings;
// Internal revision number, not serialized
private transient long revision;
+ @Nullable
+ public static ElasticConsumerGroupConfig instance(KeyValueEntry entry) throws JsonParseException {
+ if (entry != null) {
+ byte[] json = entry.getValue();
+ if (json != null) {
+ ElasticConsumerGroupConfig config = instance(json);
+ config.setRevision(entry.getRevision());
+ return config;
+ }
+ }
+ return null;
+ }
+
+ @NonNull
+ public static ElasticConsumerGroupConfig instance(byte @NonNull[] json) throws JsonParseException {
+ return new ElasticConsumerGroupConfig(JsonParser.parse(json));
+ }
+
public ElasticConsumerGroupConfig() {
this.partitioningWildcards = new int[0];
this.members = new ArrayList<>();
@@ -55,15 +73,30 @@ public ElasticConsumerGroupConfig() {
}
public ElasticConsumerGroupConfig(int maxMembers, String filter, int[] partitioningWildcards,
- long maxBufferedMsgs, long maxBufferedBytes,
+ long maxBufferedMessages, long maxBufferedBytes,
List members, List memberMappings) {
this.maxMembers = maxMembers;
this.filter = filter;
this.partitioningWildcards = partitioningWildcards != null ? partitioningWildcards.clone() : new int[0];
- this.maxBufferedMsgs = maxBufferedMsgs;
+ this.maxBufferedMessages = maxBufferedMessages;
this.maxBufferedBytes = maxBufferedBytes;
- this.members = members != null ? new ArrayList<>(members) : new ArrayList<>();
- this.memberMappings = memberMappings != null ? new ArrayList<>(memberMappings) : new ArrayList<>();
+ this.members = members == null ? new ArrayList<>() : new ArrayList<>(members);
+ this.memberMappings = memberMappings == null ? new ArrayList<>() : new ArrayList<>(memberMappings);
+ }
+
+ public ElasticConsumerGroupConfig(JsonValue jv) {
+ this.maxMembers = JsonValueUtils.readInteger(jv, MAX_MEMBERS, 0);
+ this.filter = JsonValueUtils.readString(jv, FILTER);
+ List integers = read(jv, PARTITIONING_WILDCARDS, v -> listOf(v, JsonValueUtils::getInteger));
+ this.partitioningWildcards = new int[integers.size()];
+ for (int x = 0; x < integers.size(); x++) {
+ Integer i = integers.get(x);
+ this.partitioningWildcards[x] = i == null ? 0 : i;
+ }
+ this.maxBufferedMessages = JsonValueUtils.readLong(jv, MAX_BUFFERED_MSG, 0);
+ this.maxBufferedBytes = JsonValueUtils.readLong(jv, MAX_BUFFERED_BYTES, 0);
+ this.members = JsonValueUtils.readStringList(jv, MEMBERS);
+ this.memberMappings = MemberMapping.listOfOrEmptyList(readValue(jv, MEMBER_MAPPINGS));
}
public int getMaxMembers() {
@@ -83,19 +116,19 @@ public void setFilter(String filter) {
}
public int[] getPartitioningWildcards() {
- return partitioningWildcards != null ? partitioningWildcards.clone() : new int[0];
+ return partitioningWildcards.clone();
}
public void setPartitioningWildcards(int[] partitioningWildcards) {
- this.partitioningWildcards = partitioningWildcards != null ? partitioningWildcards.clone() : new int[0];
+ this.partitioningWildcards = partitioningWildcards == null ? new int[0] : partitioningWildcards.clone();
}
- public long getMaxBufferedMsgs() {
- return maxBufferedMsgs;
+ public long getMaxBufferedMessages() {
+ return maxBufferedMessages;
}
- public void setMaxBufferedMsgs(long maxBufferedMsgs) {
- this.maxBufferedMsgs = maxBufferedMsgs;
+ public void setMaxBufferedMessages(long maxBufferedMessages) {
+ this.maxBufferedMessages = maxBufferedMessages;
}
public long getMaxBufferedBytes() {
@@ -111,15 +144,15 @@ public List getMembers() {
}
public void setMembers(List members) {
- this.members = members != null ? new ArrayList<>(members) : new ArrayList<>();
+ this.members = members == null ? new ArrayList<>() : new ArrayList<>(members);
}
public List getMemberMappings() {
- return memberMappings != null ? new ArrayList<>(memberMappings) : new ArrayList<>();
+ return new ArrayList<>(memberMappings);
}
public void setMemberMappings(List memberMappings) {
- this.memberMappings = memberMappings != null ? new ArrayList<>(memberMappings) : new ArrayList<>();
+ this.memberMappings = memberMappings == null ? new ArrayList<>() : new ArrayList<>(memberMappings);
}
public long getRevision() {
@@ -134,17 +167,14 @@ public void setRevision(long revision) {
* Checks if the given member name is in the current membership.
*/
public boolean isInMembership(String name) {
- if (memberMappings != null && !memberMappings.isEmpty()) {
+ if (!memberMappings.isEmpty()) {
for (MemberMapping mapping : memberMappings) {
if (mapping.getMember().equals(name)) {
return true;
}
}
}
- if (members != null && !members.isEmpty()) {
- return members.contains(name);
- }
- return false;
+ return members.contains(name);
}
/**
@@ -192,8 +222,8 @@ public void validate() throws ConsumerGroupException {
}
// Validate that only one of members or member mappings is provided
- boolean hasMembers = members != null && !members.isEmpty();
- boolean hasMemberMappings = memberMappings != null && !memberMappings.isEmpty();
+ boolean hasMembers = !members.isEmpty();
+ boolean hasMemberMappings = !memberMappings.isEmpty();
if (hasMembers && hasMemberMappings) {
throw new ConsumerGroupException("either members or member mappings must be provided, not both");
@@ -201,7 +231,7 @@ public void validate() throws ConsumerGroupException {
// Validate member mappings
if (hasMemberMappings) {
- if (memberMappings.size() < 1 || memberMappings.size() > maxMembers) {
+ if (memberMappings.size() > maxMembers) {
throw new ConsumerGroupException("the number of member mappings must be between 1 and the max number of members");
}
@@ -257,13 +287,33 @@ public String getPartitioningTransformDest() {
return "{{Partition(" + maxMembers + "," + wildcardList + ")}}." + destFromFilter;
}
+ @Override
+ @NonNull
+ public String toJson() {
+ StringBuilder sb = beginJson();
+ addField(sb, MAX_MEMBERS, maxMembers);
+ addField(sb, FILTER, filter);
+ if (partitioningWildcards.length > 0) {
+ List integers = new ArrayList<>(partitioningWildcards.length);
+ for (int i : partitioningWildcards) {
+ integers.add(i);
+ }
+ _addList(sb, PARTITIONING_WILDCARDS, integers, StringBuilder::append);
+ }
+ addField(sb, MAX_BUFFERED_MSG, maxBufferedMessages);
+ addField(sb, MAX_BUFFERED_BYTES, maxBufferedBytes);
+ addStrings(sb, MEMBERS, members);
+ addJsons(sb, MEMBER_MAPPINGS, memberMappings);
+ return endJson(sb).toString();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ElasticConsumerGroupConfig that = (ElasticConsumerGroupConfig) o;
return maxMembers == that.maxMembers &&
- maxBufferedMsgs == that.maxBufferedMsgs &&
+ maxBufferedMessages == that.maxBufferedMessages &&
maxBufferedBytes == that.maxBufferedBytes &&
Objects.equals(filter, that.filter) &&
Arrays.equals(partitioningWildcards, that.partitioningWildcards) &&
@@ -273,7 +323,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- int result = Objects.hash(maxMembers, filter, maxBufferedMsgs, maxBufferedBytes, members, memberMappings);
+ int result = Objects.hash(maxMembers, filter, maxBufferedMessages, maxBufferedBytes, members, memberMappings);
result = 31 * result + Arrays.hashCode(partitioningWildcards);
return result;
}
@@ -284,7 +334,7 @@ public String toString() {
"maxMembers=" + maxMembers +
", filter='" + filter + '\'' +
", partitioningWildcards=" + Arrays.toString(partitioningWildcards) +
- ", maxBufferedMsgs=" + maxBufferedMsgs +
+ ", maxBufferedMessages=" + maxBufferedMessages +
", maxBufferedBytes=" + maxBufferedBytes +
", members=" + members +
", memberMappings=" + memberMappings +
diff --git a/pcgroups/src/main/java/io/synadia/pcg/MemberMapping.java b/pcgroups/src/main/java/io/synadia/pcg/MemberMapping.java
index adca2f3..e3b6652 100644
--- a/pcgroups/src/main/java/io/synadia/pcg/MemberMapping.java
+++ b/pcgroups/src/main/java/io/synadia/pcg/MemberMapping.java
@@ -13,30 +13,67 @@
package io.synadia.pcg;
-import com.google.gson.annotations.SerializedName;
+import io.nats.client.support.JsonSerializable;
+import io.nats.client.support.JsonValue;
+import io.nats.client.support.JsonValueUtils;
+import org.jspecify.annotations.NonNull;
+
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
+import static io.nats.client.support.JsonUtils.*;
+import static io.nats.client.support.JsonValueUtils.*;
+import static io.nats.client.support.JsonValueUtils.readString;
+
/**
* Represents a mapping between a member name and its assigned partitions.
* JSON structure must be compatible with the Go version.
*/
-public class MemberMapping {
+public class MemberMapping implements JsonSerializable {
+ static final String MEMBER = "member";
+ static final String PARTITIONS = "partitions";
- @SerializedName("member")
private String member;
-
- @SerializedName("partitions")
private int[] partitions;
- public MemberMapping() {
+ static List listOfOrEmptyList(JsonValue jv) {
+ return JsonValueUtils.listOf(jv, MemberMapping::new);
}
+ public MemberMapping() {}
+
public MemberMapping(String member, int[] partitions) {
this.member = member;
this.partitions = partitions != null ? partitions.clone() : new int[0];
}
+ public MemberMapping(JsonValue jv) {
+ this.member = readString(jv, MEMBER);
+ List integers = read(jv, PARTITIONS, v -> listOf(v, JsonValueUtils::getInteger));
+ this.partitions = new int[integers.size()];
+ for (int x = 0; x < integers.size(); x++) {
+ Integer i = integers.get(x);
+ this.partitions[x] = i == null ? 0 : i;
+ }
+ }
+
+ @Override
+ @NonNull
+ public String toJson() {
+ StringBuilder sb = beginJson();
+ addField(sb, MEMBER, member);
+ if (partitions.length > 0) {
+ List integers = new ArrayList<>(partitions.length);
+ for (int i : partitions) {
+ integers.add(i);
+ }
+ _addList(sb, PARTITIONS, integers, StringBuilder::append);
+ }
+ return endJson(sb).toString();
+ }
+
public String getMember() {
return member;
}
diff --git a/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java b/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java
index a9f5d15..099e7b5 100644
--- a/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java
+++ b/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java
@@ -13,15 +13,12 @@
package io.synadia.pcg;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import io.nats.client.*;
import io.nats.client.api.*;
import io.nats.client.impl.Headers;
import io.synadia.pcg.exceptions.ConsumerGroupException;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -40,7 +37,6 @@
public class StaticConsumerGroup {
private static final Logger LOGGER = Logger.getLogger(StaticConsumerGroup.class.getName());
- private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().create();
private StaticConsumerGroup() {
// Utility class
@@ -89,10 +85,8 @@ public static StaticConsumerGroupConfig create(Connection nc, String streamName,
String key = composeKey(streamName, consumerGroupName);
// Check if config already exists
- KeyValueEntry entry = kv.get(key);
- if (entry != null) {
- String json = new String(entry.getValue(), StandardCharsets.UTF_8);
- StaticConsumerGroupConfig existingConfig = GSON.fromJson(json, StaticConsumerGroupConfig.class);
+ StaticConsumerGroupConfig existingConfig = StaticConsumerGroupConfig.instance(kv.get(key));
+ if (existingConfig != null) {
// Verify the config matches
if (!configsMatch(existingConfig, config)) {
@@ -102,8 +96,7 @@ public static StaticConsumerGroupConfig create(Connection nc, String streamName,
}
// Create the config entry
- String payload = GSON.toJson(config);
- kv.put(key, payload.getBytes(StandardCharsets.UTF_8));
+ kv.put(key, config.serialize());
return config;
}
@@ -299,8 +292,7 @@ private static StaticConsumerGroupConfig getConfigFromKV(KeyValue kv, String str
throw new ConsumerGroupException("error getting the static consumer group's config: not found");
}
- String json = new String(entry.getValue(), StandardCharsets.UTF_8);
- StaticConsumerGroupConfig config = GSON.fromJson(json, StaticConsumerGroupConfig.class);
+ StaticConsumerGroupConfig config = StaticConsumerGroupConfig.instance(entry);
config.validate();
return config;
@@ -374,7 +366,6 @@ private void joinMemberConsumer() throws IOException, JetStreamApiException, Int
// Create the durable consumer explicitly (matching Go's js.CreateConsumer)
JetStreamManagement jsm = nc.jetStreamManagement();
- System.out.printf("Creating consumer %s with filters %s and priority group %s%n\n", consumerName, filters, PRIORITY_GROUP_NAME);
jsm.createConsumer(streamName, cc);
// Get consumer context and start consuming
@@ -425,8 +416,7 @@ public void watch(KeyValueEntry entry) {
}
try {
- String json = new String(entry.getValue(), StandardCharsets.UTF_8);
- StaticConsumerGroupConfig newConfig = GSON.fromJson(json, StaticConsumerGroupConfig.class);
+ StaticConsumerGroupConfig newConfig = StaticConsumerGroupConfig.instance(entry);
newConfig.validate();
// Check if critical config changed
diff --git a/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroupConfig.java b/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroupConfig.java
index 3f0ad2d..da46472 100644
--- a/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroupConfig.java
+++ b/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroupConfig.java
@@ -13,29 +13,48 @@
package io.synadia.pcg;
-import com.google.gson.annotations.SerializedName;
+import io.nats.client.api.KeyValueEntry;
+import io.nats.client.support.*;
import io.synadia.pcg.exceptions.ConsumerGroupException;
+import org.jspecify.annotations.NonNull;
+import org.jspecify.annotations.Nullable;
import java.util.*;
+import static io.nats.client.support.JsonUtils.*;
+import static io.nats.client.support.JsonValueUtils.readValue;
+
/**
* Configuration for a static consumer group.
* JSON structure must be compatible with the Go version.
*/
-public class StaticConsumerGroupConfig {
+public class StaticConsumerGroupConfig implements JsonSerializable {
+ static final String MAX_MEMBERS = "max_members";
+ static final String FILTER = "filter";
+ static final String MEMBERS = "members";
+ static final String MEMBER_MAPPINGS = "member_mappings";
- @SerializedName("max_members")
private int maxMembers;
-
- @SerializedName("filter")
private String filter;
-
- @SerializedName("members")
private List members;
-
- @SerializedName("member_mappings")
private List memberMappings;
+ @Nullable
+ public static StaticConsumerGroupConfig instance(KeyValueEntry entry) throws JsonParseException {
+ if (entry != null) {
+ byte[] json = entry.getValue();
+ if (json != null) {
+ return instance(json);
+ }
+ }
+ return null;
+ }
+
+ @NonNull
+ public static StaticConsumerGroupConfig instance(byte @NonNull[] json) throws JsonParseException {
+ return new StaticConsumerGroupConfig(JsonParser.parse(json));
+ }
+
public StaticConsumerGroupConfig() {
this.members = new ArrayList<>();
this.memberMappings = new ArrayList<>();
@@ -44,8 +63,26 @@ public StaticConsumerGroupConfig() {
public StaticConsumerGroupConfig(int maxMembers, String filter, List members, List memberMappings) {
this.maxMembers = maxMembers;
this.filter = filter;
- this.members = members != null ? new ArrayList<>(members) : new ArrayList<>();
- this.memberMappings = memberMappings != null ? new ArrayList<>(memberMappings) : new ArrayList<>();
+ this.members = members == null ? new ArrayList<>() : new ArrayList<>(members);
+ this.memberMappings = memberMappings == null ? new ArrayList<>() : new ArrayList<>(memberMappings);
+ }
+
+ public StaticConsumerGroupConfig(JsonValue jv) {
+ this.maxMembers = JsonValueUtils.readInteger(jv, MAX_MEMBERS, 0);
+ this.filter = JsonValueUtils.readString(jv, FILTER);
+ this.members = JsonValueUtils.readStringList(jv, MEMBERS);
+ this.memberMappings = MemberMapping.listOfOrEmptyList(readValue(jv, MEMBER_MAPPINGS));
+ }
+
+ @Override
+ @NonNull
+ public String toJson() {
+ StringBuilder sb = beginJson();
+ addField(sb, MAX_MEMBERS, maxMembers);
+ addField(sb, FILTER, filter);
+ addStrings(sb, MEMBERS, members);
+ addJsons(sb, MEMBER_MAPPINGS, memberMappings);
+ return endJson(sb).toString();
}
public int getMaxMembers() {
@@ -65,36 +102,33 @@ public void setFilter(String filter) {
}
public List getMembers() {
- return members != null ? new ArrayList<>(members) : new ArrayList<>();
+ return new ArrayList<>(members);
}
public void setMembers(List members) {
- this.members = members != null ? new ArrayList<>(members) : new ArrayList<>();
+ this.members = members == null ? new ArrayList<>() : new ArrayList<>(members);
}
public List getMemberMappings() {
- return memberMappings != null ? new ArrayList<>(memberMappings) : new ArrayList<>();
+ return new ArrayList<>(memberMappings);
}
public void setMemberMappings(List memberMappings) {
- this.memberMappings = memberMappings != null ? new ArrayList<>(memberMappings) : new ArrayList<>();
+ this.memberMappings = memberMappings == null ? new ArrayList<>() : new ArrayList<>(memberMappings);
}
/**
* Checks if the given member name is in the current membership.
*/
public boolean isInMembership(String name) {
- if (memberMappings != null && !memberMappings.isEmpty()) {
+ if (!memberMappings.isEmpty()) {
for (MemberMapping mapping : memberMappings) {
if (mapping.getMember().equals(name)) {
return true;
}
}
}
- if (members != null && !members.isEmpty()) {
- return members.contains(name);
- }
- return false;
+ return members.contains(name);
}
/**
@@ -109,8 +143,8 @@ public void validate() throws ConsumerGroupException {
}
// Validate that only one of members or member mappings is provided
- boolean hasMembers = members != null && !members.isEmpty();
- boolean hasMemberMappings = memberMappings != null && !memberMappings.isEmpty();
+ boolean hasMembers = !members.isEmpty();
+ boolean hasMemberMappings = !memberMappings.isEmpty();
if (hasMembers && hasMemberMappings) {
throw new ConsumerGroupException("either members or member mappings must be provided, not both");
@@ -118,7 +152,7 @@ public void validate() throws ConsumerGroupException {
// Validate member mappings
if (hasMemberMappings) {
- if (memberMappings.size() < 1 || memberMappings.size() > maxMembers) {
+ if (memberMappings.size() > maxMembers) {
throw new ConsumerGroupException("the number of member mappings must be between 1 and the max number of members");
}
diff --git a/pcgroups/src/main/javadoc/images/favicon.ico b/pcgroups/src/main/javadoc/images/favicon.ico
new file mode 100644
index 0000000..9464855
Binary files /dev/null and b/pcgroups/src/main/javadoc/images/favicon.ico differ
diff --git a/pcgroups/src/main/javadoc/images/large-logo.png b/pcgroups/src/main/javadoc/images/large-logo.png
new file mode 100644
index 0000000..33f9483
Binary files /dev/null and b/pcgroups/src/main/javadoc/images/large-logo.png differ
diff --git a/pcgroups/src/main/javadoc/images/synadia-logo.png b/pcgroups/src/main/javadoc/images/synadia-logo.png
new file mode 100644
index 0000000..1f14bda
Binary files /dev/null and b/pcgroups/src/main/javadoc/images/synadia-logo.png differ
diff --git a/pcgroups/src/main/javadoc/overview.html b/pcgroups/src/main/javadoc/overview.html
new file mode 100644
index 0000000..1bfaaa5
--- /dev/null
+++ b/pcgroups/src/main/javadoc/overview.html
@@ -0,0 +1,13 @@
+
+
+
+
+
+NATS JetStream Partitioned Consumer Groups Library for Java
+
+
+
+
+
diff --git a/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java b/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java
index e4a9be3..b6c2a59 100644
--- a/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java
+++ b/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java
@@ -13,15 +13,17 @@
package io.synadia.pcg;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import io.nats.NatsRunnerUtils;
+import io.nats.client.support.JsonParseException;
import io.synadia.pcg.exceptions.ConsumerGroupException;
import org.junit.jupiter.api.Test;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.logging.Level;
import static org.junit.jupiter.api.Assertions.*;
@@ -31,7 +33,9 @@
*/
class ElasticConsumerGroupTest {
- private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().create();
+ static {
+ NatsRunnerUtils.setDefaultOutputLevel(Level.SEVERE);
+ }
@Test
void testConfigBasic() {
@@ -43,7 +47,7 @@ void testConfigBasic() {
assertEquals(4, config.getMaxMembers());
assertEquals("foo.*", config.getFilter());
assertArrayEquals(new int[]{1}, config.getPartitioningWildcards());
- assertEquals(1000, config.getMaxBufferedMsgs());
+ assertEquals(1000, config.getMaxBufferedMessages());
assertEquals(10000, config.getMaxBufferedBytes());
assertEquals(2, config.getMembers().size());
assertTrue(config.getMemberMappings().isEmpty());
@@ -229,13 +233,13 @@ void testGetPartitioningTransformDestPartialWildcards() {
}
@Test
- void testJsonSerializationWithMembers() {
+ void testJsonSerializationWithMembers() throws JsonParseException {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, "foo.*", new int[]{1}, 1000, 10000,
Arrays.asList("m1", "m2"), new ArrayList<>()
);
- String json = GSON.toJson(config);
+ String json = config.toJson();
// Verify JSON structure matches Go format
assertTrue(json.contains("\"max_members\":4"));
@@ -246,17 +250,17 @@ void testJsonSerializationWithMembers() {
assertTrue(json.contains("\"members\":[\"m1\",\"m2\"]"));
// Deserialize and verify
- ElasticConsumerGroupConfig deserialized = GSON.fromJson(json, ElasticConsumerGroupConfig.class);
+ ElasticConsumerGroupConfig deserialized = ElasticConsumerGroupConfig.instance(config.serialize());
assertEquals(config.getMaxMembers(), deserialized.getMaxMembers());
assertEquals(config.getFilter(), deserialized.getFilter());
assertArrayEquals(config.getPartitioningWildcards(), deserialized.getPartitioningWildcards());
- assertEquals(config.getMaxBufferedMsgs(), deserialized.getMaxBufferedMsgs());
+ assertEquals(config.getMaxBufferedMessages(), deserialized.getMaxBufferedMessages());
assertEquals(config.getMaxBufferedBytes(), deserialized.getMaxBufferedBytes());
assertEquals(config.getMembers(), deserialized.getMembers());
}
@Test
- void testJsonSerializationWithMappings() {
+ void testJsonSerializationWithMappings() throws JsonParseException {
List mappings = Arrays.asList(
new MemberMapping("alice", new int[]{0, 1}),
new MemberMapping("bob", new int[]{2, 3})
@@ -267,30 +271,30 @@ void testJsonSerializationWithMappings() {
new ArrayList<>(), mappings
);
- String json = GSON.toJson(config);
+ String json = config.toJson();
assertTrue(json.contains("\"member_mappings\""));
assertTrue(json.contains("\"member\":\"alice\""));
assertTrue(json.contains("\"partitions\":[0,1]"));
// Deserialize and verify
- ElasticConsumerGroupConfig deserialized = GSON.fromJson(json, ElasticConsumerGroupConfig.class);
+ ElasticConsumerGroupConfig deserialized = ElasticConsumerGroupConfig.instance(config.serialize());
assertEquals(2, deserialized.getMemberMappings().size());
assertEquals("alice", deserialized.getMemberMappings().get(0).getMember());
assertArrayEquals(new int[]{0, 1}, deserialized.getMemberMappings().get(0).getPartitions());
}
@Test
- void testJsonDeserializationFromGo() {
+ void testJsonDeserializationFromGo() throws JsonParseException {
// This JSON is in the format produced by the Go implementation
String goJson = "{\"max_members\":4,\"filter\":\"foo.*\",\"partitioning_wildcards\":[1],\"max_buffered_msg\":1000,\"max_buffered_bytes\":10000,\"members\":[\"m1\",\"m2\"]}";
- ElasticConsumerGroupConfig config = GSON.fromJson(goJson, ElasticConsumerGroupConfig.class);
+ ElasticConsumerGroupConfig config = ElasticConsumerGroupConfig.instance(goJson.getBytes(StandardCharsets.UTF_8));
assertEquals(4, config.getMaxMembers());
assertEquals("foo.*", config.getFilter());
assertArrayEquals(new int[]{1}, config.getPartitioningWildcards());
- assertEquals(1000, config.getMaxBufferedMsgs());
+ assertEquals(1000, config.getMaxBufferedMessages());
assertEquals(10000, config.getMaxBufferedBytes());
assertEquals(2, config.getMembers().size());
assertEquals("m1", config.getMembers().get(0));
@@ -298,11 +302,11 @@ void testJsonDeserializationFromGo() {
}
@Test
- void testJsonDeserializationWithMappingsFromGo() {
+ void testJsonDeserializationWithMappingsFromGo() throws JsonParseException {
// This JSON is in the format produced by the Go implementation
String goJson = "{\"max_members\":4,\"filter\":\"bar.*\",\"partitioning_wildcards\":[1],\"member_mappings\":[{\"member\":\"alice\",\"partitions\":[0,1]},{\"member\":\"bob\",\"partitions\":[2,3]}]}";
- ElasticConsumerGroupConfig config = GSON.fromJson(goJson, ElasticConsumerGroupConfig.class);
+ ElasticConsumerGroupConfig config = ElasticConsumerGroupConfig.instance(goJson.getBytes(StandardCharsets.UTF_8));
assertEquals(4, config.getMaxMembers());
assertEquals("bar.*", config.getFilter());
@@ -359,7 +363,7 @@ void testRevision() {
assertEquals(42, config.getRevision());
// Verify revision is not serialized
- String json = GSON.toJson(config);
+ String json = config.toJson();
assertFalse(json.contains("revision"));
}
diff --git a/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java b/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java
index c48a3de..1faa142 100644
--- a/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java
+++ b/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java
@@ -13,9 +13,12 @@
package io.synadia.pcg;
+import io.nats.NatsRunnerUtils;
import io.nats.NatsServerRunner;
import io.nats.client.Connection;
+import io.nats.client.ErrorListener;
import io.nats.client.Nats;
+import io.nats.client.Options;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
@@ -27,8 +30,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
/**
* Integration tests for Static and Elastic consumer groups.
@@ -36,6 +41,10 @@
*/
class IntegrationTest {
+ static {
+ NatsRunnerUtils.setDefaultOutputLevel(Level.SEVERE);
+ }
+
/**
* Ported from Go TestStatic.
* Creates a stream with subject transform, publishes 10 messages,
@@ -44,7 +53,8 @@ class IntegrationTest {
@Test
void testStatic() throws Exception {
try (NatsServerRunner server = NatsServerRunner.builder().jetstream(true).build()) {
- Connection nc = Nats.connect(server.getNatsLocalhostUri());
+ Options options = Options.builder().server(server.getNatsLocalhostUri()).errorListener(new ErrorListener() {}).build();
+ Connection nc = Nats.connect(options);
String streamName = "test";
String cgName = "group";
@@ -123,7 +133,8 @@ void testStatic() throws Exception {
@Test
void testElastic() throws Exception {
try (NatsServerRunner server = NatsServerRunner.builder().jetstream(true).build()) {
- Connection nc = Nats.connect(server.getNatsLocalhostUri());
+ Options options = Options.builder().server(server.getNatsLocalhostUri()).errorListener(new ErrorListener() {}).build();
+ Connection nc = Nats.connect(options);
String streamName = "test";
String cgName = "group";
diff --git a/pcgroups/src/test/java/io/synadia/pcg/PartitionUtilsTest.java b/pcgroups/src/test/java/io/synadia/pcg/PartitionUtilsTest.java
index fa65bc3..c11673c 100644
--- a/pcgroups/src/test/java/io/synadia/pcg/PartitionUtilsTest.java
+++ b/pcgroups/src/test/java/io/synadia/pcg/PartitionUtilsTest.java
@@ -13,14 +13,17 @@
package io.synadia.pcg;
+import io.nats.NatsRunnerUtils;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.logging.Level;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Unit tests for PartitionUtils.
@@ -28,6 +31,10 @@
*/
class PartitionUtilsTest {
+ static {
+ NatsRunnerUtils.setDefaultOutputLevel(Level.SEVERE);
+ }
+
@Test
void testComposeKey() {
assertEquals("mystream.mycg", PartitionUtils.composeKey("mystream", "mycg"));
diff --git a/pcgroups/src/test/java/io/synadia/pcg/StaticConsumerGroupTest.java b/pcgroups/src/test/java/io/synadia/pcg/StaticConsumerGroupTest.java
index dd8baeb..073d4ae 100644
--- a/pcgroups/src/test/java/io/synadia/pcg/StaticConsumerGroupTest.java
+++ b/pcgroups/src/test/java/io/synadia/pcg/StaticConsumerGroupTest.java
@@ -13,15 +13,17 @@
package io.synadia.pcg;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import io.nats.NatsRunnerUtils;
+import io.nats.client.support.JsonParseException;
import io.synadia.pcg.exceptions.ConsumerGroupException;
import org.junit.jupiter.api.Test;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.logging.Level;
import static org.junit.jupiter.api.Assertions.*;
@@ -31,7 +33,9 @@
*/
class StaticConsumerGroupTest {
- private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().create();
+ static {
+ NatsRunnerUtils.setDefaultOutputLevel(Level.SEVERE);
+ }
@Test
void testConfigWithMembers() {
@@ -185,14 +189,14 @@ void testValidationWithMembersSuccess() throws ConsumerGroupException {
}
@Test
- void testJsonSerializationWithMembers() {
+ void testJsonSerializationWithMembers() throws JsonParseException {
StaticConsumerGroupConfig config = new StaticConsumerGroupConfig(
4, "foo.>",
Arrays.asList("m1", "m2"),
new ArrayList<>()
);
- String json = GSON.toJson(config);
+ String json = config.toJson();
// Verify JSON structure matches Go format
assertTrue(json.contains("\"max_members\":4"));
@@ -200,14 +204,14 @@ void testJsonSerializationWithMembers() {
assertTrue(json.contains("\"members\":[\"m1\",\"m2\"]"));
// Deserialize and verify
- StaticConsumerGroupConfig deserialized = GSON.fromJson(json, StaticConsumerGroupConfig.class);
+ StaticConsumerGroupConfig deserialized = StaticConsumerGroupConfig.instance(json.getBytes(StandardCharsets.UTF_8));
assertEquals(config.getMaxMembers(), deserialized.getMaxMembers());
assertEquals(config.getFilter(), deserialized.getFilter());
assertEquals(config.getMembers(), deserialized.getMembers());
}
@Test
- void testJsonSerializationWithMappings() {
+ void testJsonSerializationWithMappings() throws JsonParseException {
List mappings = Arrays.asList(
new MemberMapping("alice", new int[]{0, 1}),
new MemberMapping("bob", new int[]{2, 3})
@@ -217,7 +221,7 @@ void testJsonSerializationWithMappings() {
4, "foo.>", new ArrayList<>(), mappings
);
- String json = GSON.toJson(config);
+ String json = config.toJson();
// Verify JSON structure matches Go format
assertTrue(json.contains("\"max_members\":4"));
@@ -226,7 +230,7 @@ void testJsonSerializationWithMappings() {
assertTrue(json.contains("\"partitions\":[0,1]"));
// Deserialize and verify
- StaticConsumerGroupConfig deserialized = GSON.fromJson(json, StaticConsumerGroupConfig.class);
+ StaticConsumerGroupConfig deserialized = StaticConsumerGroupConfig.instance(json.getBytes(StandardCharsets.UTF_8));
assertEquals(config.getMaxMembers(), deserialized.getMaxMembers());
assertEquals(2, deserialized.getMemberMappings().size());
assertEquals("alice", deserialized.getMemberMappings().get(0).getMember());
@@ -234,11 +238,11 @@ void testJsonSerializationWithMappings() {
}
@Test
- void testJsonDeserializationFromGo() {
+ void testJsonDeserializationFromGo() throws JsonParseException {
// This JSON is in the format produced by the Go implementation
String goJson = "{\"max_members\":4,\"filter\":\"foo.>\",\"members\":[\"m1\",\"m2\",\"m3\",\"m4\"]}";
- StaticConsumerGroupConfig config = GSON.fromJson(goJson, StaticConsumerGroupConfig.class);
+ StaticConsumerGroupConfig config = StaticConsumerGroupConfig.instance(goJson.getBytes(StandardCharsets.UTF_8));
assertEquals(4, config.getMaxMembers());
assertEquals("foo.>", config.getFilter());
@@ -247,11 +251,11 @@ void testJsonDeserializationFromGo() {
}
@Test
- void testJsonDeserializationWithMappingsFromGo() {
+ void testJsonDeserializationWithMappingsFromGo() throws JsonParseException {
// This JSON is in the format produced by the Go implementation
String goJson = "{\"max_members\":4,\"filter\":\"bar.>\",\"member_mappings\":[{\"member\":\"alice\",\"partitions\":[0,1]},{\"member\":\"bob\",\"partitions\":[2,3]}]}";
- StaticConsumerGroupConfig config = GSON.fromJson(goJson, StaticConsumerGroupConfig.class);
+ StaticConsumerGroupConfig config = StaticConsumerGroupConfig.instance(goJson.getBytes(StandardCharsets.UTF_8));
assertEquals(4, config.getMaxMembers());
assertEquals("bar.>", config.getFilter());