From a36d70967faa0bf36f5add24520d3d459f9f83f4 Mon Sep 17 00:00:00 2001 From: Alexander Krimm Date: Wed, 21 Jan 2026 16:33:21 +0100 Subject: [PATCH 1/3] CI: replace set-output by env file Github actions has deprecated the old way of returning results of an action. This updates the use of `::set-output` to the new way of writing into an environment file. also fix leftover reference version: Leftover from the previous versioning change that led to inconsistent versions across build and deploy. Signed-off-by: Alexander Krimm --- .github/actions/get-version/version.sh | 6 +++--- .github/workflows/maven.yml | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.github/actions/get-version/version.sh b/.github/actions/get-version/version.sh index bd69fb84..0a0e3d32 100755 --- a/.github/actions/get-version/version.sh +++ b/.github/actions/get-version/version.sh @@ -25,8 +25,8 @@ else export sha1=-$(git rev-parse --short HEAD) export changelist="-SNAPSHOT" fi -echo "::set-output name=revision::$rev" -echo "::set-output name=sha1::$sha1" -echo "::set-output name=changelist::$changelist" +echo "revision=$rev" >> "$GITHUB_OUTPUT" +echo "sha1=$sha1" >> "$GITHUB_OUTPUT" +echo "changelist=$changelist" >> "$GITHUB_OUTPUT" echo "Version will be:" echo "${rev}${sha1}${changelist}" diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 032f27fb..a3636711 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -32,8 +32,6 @@ jobs: run: git fetch --depth=1 origin +refs/tags/*:refs/tags/* - id: version uses: ./.github/actions/get-version - with: - versionPattern: '${{ env.JAVA_REFERENCE_VERSION }}.[0-9]*.[0-9]*' - uses: actions/cache@v4 # cache maven packages to speed up build with: path: ~/.m2 From 0a251213eb140bdeb9da8a6176750730919839b6 Mon Sep 17 00:00:00 2001 From: Alexander Krimm Date: Wed, 21 Jan 2026 14:16:07 +0100 Subject: [PATCH 2/3] Client: improve handling of reply context - Future: set atomic to ready only after the reply has been set - ReplyQuery: only include entries from reply context, do not duplicate entries from request url - DataSourcePublisher.get: allow access to the reply context from the returned future - DataSourcePublsher subscription listener: fix handling of Context type none Signed-off-by: Alexander Krimm --- .../opencmw/client/DataSourcePublisher.java | 53 +++++++++++-------- .../opencmw/client/OpenCmwDataSourceTest.java | 6 ++- .../java/io/opencmw/utils/CustomFuture.java | 4 +- .../io/opencmw/server/MajordomoWorker.java | 4 +- 4 files changed, 40 insertions(+), 27 deletions(-) diff --git a/client/src/main/java/io/opencmw/client/DataSourcePublisher.java b/client/src/main/java/io/opencmw/client/DataSourcePublisher.java index 74a4b0a6..199dfdd2 100644 --- a/client/src/main/java/io/opencmw/client/DataSourcePublisher.java +++ b/client/src/main/java/io/opencmw/client/DataSourcePublisher.java @@ -20,10 +20,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -317,9 +314,9 @@ protected boolean handleDataSourceSockets() { return dataAvailable; } - protected ThePromisedFuture newRequestFuture(final URI endpoint, final Class requestedDomainObjType, final Command requestType, final String requestId) { + protected ThePromisedFuture newRequestFuture(final URI endpoint, final Class requestedDomainObjType, final Command requestType, final String requestId) { FilterRegistry.checkClassForNewFilters(requestedDomainObjType); - final ThePromisedFuture requestFuture = new ThePromisedFuture<>(endpoint, requestedDomainObjType, null, requestType, requestId, null); + final ThePromisedFuture requestFuture = new ThePromisedFuture<>(endpoint, requestedDomainObjType, null, requestType, requestId, null); final Object oldEntry = requests.put(requestId, requestFuture); assert oldEntry == null : "requestID '" + requestId + "' already present in requestFutureMap"; return requestFuture; @@ -367,19 +364,19 @@ protected void internalEventHandler(final RingBufferEvent event, final long sequ replyDomainObject = ioClassSerialiser.deserialiseObject(reqClassType); ioClassSerialiser.setDataBuffer(byteBuffer); // allow received byte array to be released } - if (notifyFuture) { - domainObject.future.castAndSetReply(replyDomainObject); // notify callback + final Object contextObject; + if (domainObject.future.contextType == null || domainObject.future.contextType.equals(Map.class)) { + contextObject = QueryParameterParser.getMap(endpointURI.getQuery()); + } else { + contextObject = QueryParameterParser.parseQueryParameter(domainObject.future.contextType, endpointURI.getQuery()); } if (domainObject.future.listener != null) { final var finalDomainObj = replyDomainObject; - final Object contextObject; - if (domainObject.future.contextType == null) { - contextObject = QueryParameterParser.getMap(endpointURI.getQuery()); - } else { - contextObject = QueryParameterParser.parseQueryParameter(domainObject.future.contextType, endpointURI.getQuery()); - } executor.submit(() -> domainObject.future.notifyListener(finalDomainObj, contextObject)); // NOPMD - threads are ok, not a webapp } + if (notifyFuture) { + domainObject.future.castAndSetReplyWithContext(replyDomainObject, contextObject); // notify callback + } } catch (Exception e) { // NOPMD: exception is forwarded to client final var sw = new StringWriter(); final var pw = new PrintWriter(sw); @@ -453,18 +450,18 @@ private Client() { // accessed via outer class method clientSocket.connect(inprocCtrl); } - public Future get(URI endpoint, final C requestContext, final Class requestedDomainObjType, final RbacProvider... rbacProvider) { + public ThePromisedFuture get(URI endpoint, final C requestContext, final Class requestedDomainObjType, final RbacProvider... rbacProvider) { final String requestId = clientId + internalReqIdGenerator.incrementAndGet(); final URI endpointQuery = getEndpointQuery(endpoint, requestContext); - final ThePromisedFuture rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.GET_REQUEST, requestId); + final ThePromisedFuture rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.GET_REQUEST, requestId); request(requestId, Command.GET_REQUEST, endpointQuery, null, requestContext, rbacProvider); return rThePromisedFuture; } - public Future set(final URI endpoint, final R requestBody, final C requestContext, final Class requestedDomainObjType, final RbacProvider... rbacProvider) { + public ThePromisedFuture set(final URI endpoint, final R requestBody, final C requestContext, final Class requestedDomainObjType, final RbacProvider... rbacProvider) { final String requestId = clientId + internalReqIdGenerator.incrementAndGet(); final URI endpointQuery = getEndpointQuery(endpoint, requestContext); - final ThePromisedFuture rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.SET_REQUEST, requestId); + final ThePromisedFuture rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.SET_REQUEST, requestId); request(requestId, Command.SET_REQUEST, endpointQuery, requestBody, requestContext, rbacProvider); return rThePromisedFuture; } @@ -551,8 +548,9 @@ private void request(final String requestId, final Command requestType, f } } - protected static class ThePromisedFuture extends CustomFuture { // NOPMD - no need for setters/getters here + public static class ThePromisedFuture extends CustomFuture { // NOPMD - no need for setters/getters here private final URI endpoint; + private C replyContext; private final Class requestedDomainObjType; private final Class contextType; private final Command requestType; @@ -586,10 +584,12 @@ public String getInternalRequestID() { } public void notifyListener(final Object obj, final Object contextObject) { - if (obj == null || !requestedDomainObjType.isAssignableFrom(obj.getClass()) || !contextType.isAssignableFrom(contextObject.getClass())) { + if (obj == null || !requestedDomainObjType.isAssignableFrom(obj.getClass())) { LOGGER.atError().addArgument(requestedDomainObjType.getName()).addArgument(obj == null ? "null" : obj.getClass().getName()).log("Got wrong type for notification, got {} expected {}"); + } else if (contextType != null && !contextType.isAssignableFrom(contextObject.getClass())) { + LOGGER.atError().addArgument(contextObject.getClass().getName()).addArgument(contextType.getName()).log("Got wrong context type for notification, got {} expected {}"); } else { - //noinspection unchecked - cast is checked dynamically + // noinspection unchecked - cast is checked dynamically listener.dataUpdate((R) obj, (C) contextObject); // NOPMD NOSONAR - cast is checked before implicitly } } @@ -598,6 +598,17 @@ public void notifyListener(final Object obj, final Object contextObject) { protected void castAndSetReply(final Object newValue) { this.setReply((R) newValue); } + + @SuppressWarnings("unchecked") + protected void castAndSetReplyWithContext(final Object newValue, final Object contextObject) { + this.replyContext = (C) contextObject; + this.setReply((R) newValue); + } + + public C getReplyContext() throws ExecutionException, InterruptedException { + super.get(); + return this.replyContext; + } } protected static class InternalDomainObject { diff --git a/client/src/test/java/io/opencmw/client/OpenCmwDataSourceTest.java b/client/src/test/java/io/opencmw/client/OpenCmwDataSourceTest.java index 56908cc2..193b5176 100644 --- a/client/src/test/java/io/opencmw/client/OpenCmwDataSourceTest.java +++ b/client/src/test/java/io/opencmw/client/OpenCmwDataSourceTest.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Queue; @@ -257,7 +258,7 @@ void testGetRequest() throws InterruptedException, ExecutionException, TimeoutEx // get request final URI requestURI = URI.create(brokerAddress + "/testWorker?ctx=FAIR.SELECTOR.C=3&contentType=application/octet-stream"); LOGGER.atDebug().addArgument(requestURI).log("requesting GET from endpoint: {}"); - final Future future; + final DataSourcePublisher.ThePromisedFuture>> future; try (final DataSourcePublisher.Client client = dataSourcePublisher.getClient()) { future = client.get(requestURI, null, TestObject.class); // uri_without_query oder serviceName + resolver, requestContext, type } @@ -265,10 +266,13 @@ void testGetRequest() throws InterruptedException, ExecutionException, TimeoutEx // assert result final TestObject result = future.get(1000, TimeUnit.MILLISECONDS); assertEquals(referenceObject, result); + final Map> context = future.getReplyContext(); + assertEquals("FAIR.SELECTOR.C=3:P=5", context.get("ctx").get(0)); eventStore.stop(); dataSourcePublisher.stop(); } + @Test void testGetRequestWithContext() throws InterruptedException, ExecutionException, TimeoutException { final TestObject referenceObject = new TestObject("asdf", 42); diff --git a/core/src/main/java/io/opencmw/utils/CustomFuture.java b/core/src/main/java/io/opencmw/utils/CustomFuture.java index 80df5c02..69ac6a94 100644 --- a/core/src/main/java/io/opencmw/utils/CustomFuture.java +++ b/core/src/main/java/io/opencmw/utils/CustomFuture.java @@ -92,18 +92,18 @@ public boolean isDone() { * @throws IllegalStateException in case this method has been already called or Future has been cancelled */ public void setReply(final T newValue) { + this.reply = newValue; if (done.getAndSet(true)) { throw new IllegalStateException("future is not running anymore (either cancelled or already notified)"); } - this.reply = newValue; notifyListener(); } public void setException(final Throwable exception) { + this.exception = exception; if (done.getAndSet(true)) { throw new IllegalStateException("future is not running anymore (either cancelled or already notified)"); } - this.exception = exception; notifyListener(); } diff --git a/server/src/main/java/io/opencmw/server/MajordomoWorker.java b/server/src/main/java/io/opencmw/server/MajordomoWorker.java index 6e4a2616..d3d74ebd 100644 --- a/server/src/main/java/io/opencmw/server/MajordomoWorker.java +++ b/server/src/main/java/io/opencmw/server/MajordomoWorker.java @@ -214,9 +214,7 @@ protected void serialiseData(final IoClassSerialiser classSerialiser, final IoBu final URI reqTopic = rawCtx.req.topic; rawCtx.rep.topic = new URI(reqTopic.getScheme(), reqTopic.getAuthority(), reqTopic.getPath(), replyQuery, reqTopic.getFragment()); } else { - final String oldQuery = rawCtx.rep.topic.getQuery(); - final String newQuery = oldQuery == null || oldQuery.isBlank() ? replyQuery : (oldQuery + "&" + replyQuery); - rawCtx.rep.topic = new URI(rawCtx.rep.topic.getScheme(), rawCtx.rep.topic.getAuthority(), rawCtx.rep.topic.getPath(), newQuery, null); + rawCtx.rep.topic = new URI(rawCtx.rep.topic.getScheme(), rawCtx.rep.topic.getAuthority(), rawCtx.rep.topic.getPath(), replyQuery, null); } final MimeType replyMimeType = QueryParameterParser.getMimeType(replyQuery); // no MIME type given -> stick with the one specified in the request (if it exists) or keep default: copy of raw binary data From 0c14f649acfdcc678e171b67ebfd0f9c290f8f80 Mon Sep 17 00:00:00 2001 From: Alexander Krimm Date: Mon, 26 Jan 2026 15:14:41 +0100 Subject: [PATCH 3/3] DataSourcePublisher: allow to pass binary data Allows to provide pre-serialised binary data to Set commands the same way as to receive non-deserialized data via the BinaryData special domain object. This is needed for use-cases which provide run-time dynamic set and get access e.g. via a map interface. Signed-off-by: Alexander Krimm --- client/src/main/java/io/opencmw/client/DataSourcePublisher.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/src/main/java/io/opencmw/client/DataSourcePublisher.java b/client/src/main/java/io/opencmw/client/DataSourcePublisher.java index 199dfdd2..769e33b6 100644 --- a/client/src/main/java/io/opencmw/client/DataSourcePublisher.java +++ b/client/src/main/java/io/opencmw/client/DataSourcePublisher.java @@ -526,6 +526,8 @@ private void request(final String requestId, final Command requestType, f msg.add(endpoint.toString()); if (requestBody == null) { msg.add(EMPTY_ZFRAME); + } else if (requestBody instanceof BinaryData binaryData) { + msg.add(Arrays.copyOfRange(binaryData.data, 0, binaryData.dataSize > 0 ? binaryData.dataSize : binaryData.data.length)); } else { final Class matchingSerialiser = DataSource.getFactory(endpoint).getMatchingSerialiserType(endpoint); final IoClassSerialiser serialiser = getSerialiser(); // lazily initialize IoClassSerialiser