From a285cd5c487cab97c27468bca2cdc94687a38947 Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Tue, 30 Dec 2025 17:01:51 -0500 Subject: [PATCH] MLE-26420 Added DocumentWriteSetFilter Started making tests under "com.marklogic.client.datamovement" as well so that protected methods can be unit-tested. --- .copyrightconfig | 2 +- CODEOWNERS | 2 +- .../datamovement/DocumentWriteSetFilter.java | 39 +++++++++++ .../client/datamovement/WriteBatcher.java | 13 ++++ .../datamovement/impl/BatchWriteSet.java | 24 ++++++- .../client/datamovement/impl/BatchWriter.java | 13 +++- .../datamovement/impl/WriteBatcherImpl.java | 15 ++-- .../okhttp/RetryIOExceptionInterceptor.java | 5 +- .../WriteNakedPropertiesTest.java | 13 +--- .../filter/RemoveAllDocumentsFilterTest.java | 42 ++++++++++++ .../filter/WriteBatcherTemplate.java | 27 ++++++++ .../client/test/AbstractClientTest.java | 1 + .../datamovement/IncrementalWriteTest.java | 68 ------------------- 13 files changed, 175 insertions(+), 89 deletions(-) create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DocumentWriteSetFilter.java rename marklogic-client-api/src/test/java/com/marklogic/client/{test => }/datamovement/WriteNakedPropertiesTest.java (80%) create mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/RemoveAllDocumentsFilterTest.java create mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/WriteBatcherTemplate.java delete mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/IncrementalWriteTest.java diff --git a/.copyrightconfig b/.copyrightconfig index 1782ce3dc..c87b8a91b 100644 --- a/.copyrightconfig +++ b/.copyrightconfig @@ -11,4 +11,4 @@ startyear: 2010 # - Dotfiles already skipped automatically # Enable by removing the leading '# ' from the next line and editing values. # filesexcluded: third_party/*, docs/generated/*.md, assets/*.png, scripts/temp_*.py, vendor/lib.js -filesexcluded: .github/*, README.md, Jenkinsfile, gradle/*, docker-compose.yaml, docker-compose.yml, *.gradle, gradle.properties, gradlew, gradlew.bat, **/test/resources/**, *.md, pom.xml, *.properties, *.json, *.xml +filesexcluded: .github/*, README.md, Jenkinsfile, gradle/*, docker-compose.yaml, docker-compose.yml, *.gradle, gradle.properties, gradlew, gradlew.bat, **/test/resources/**, *.md, pom.xml, *.properties, *.json, *.xml, CODEOWNERS diff --git a/CODEOWNERS b/CODEOWNERS index 94edd30a2..4eff9ce7f 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -2,4 +2,4 @@ # Each line is a file pattern followed by one or more owners. # These owners will be the default owners for everything in the repo. -* @anu3990 @billfarber @rjrudin @stevebio +* @billfarber @rjrudin @stevebio diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DocumentWriteSetFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DocumentWriteSetFilter.java new file mode 100644 index 000000000..7f00d8161 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DocumentWriteSetFilter.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement; + +import com.marklogic.client.DatabaseClient; +import com.marklogic.client.document.DocumentWriteSet; + +import java.util.function.Function; + +/** + * A filter that can modify a DocumentWriteSet before it is written to the database. + * + * @since 8.1.0 + */ +public interface DocumentWriteSetFilter extends Function { + + interface Context { + /** + * @return the DocumentWriteSet to be written + */ + DocumentWriteSet getDocumentWriteSet(); + + /** + * @return the batch number + */ + long getBatchNumber(); + + /** + * @return the DatabaseClient being used for this batch + */ + DatabaseClient getDatabaseClient(); + + /** + * @return the temporal collection name, or null if not writing to a temporal collection + */ + String getTemporalCollection(); + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java index 656b029cb..0facc145f 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java @@ -357,4 +357,17 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle, * @param writeBatch the information about the batch that failed */ void retryWithFailureListeners(WriteBatch writeBatch); + + /** + * Sets a filter to modify or replace the DocumentWriteSet before it is written. + * The filter can return either the modified DocumentWriteSet or a new one. + * If the filter returns null or an empty DocumentWriteSet, no write will occur. + * + * @param filter the function to apply before writing + * @return this instance for method chaining + * @since 8.1.0 + */ + default WriteBatcher withDocumentWriteSetFilter(DocumentWriteSetFilter filter) { + return this; + } } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java index 0c08fdd7b..f6e91c91a 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java @@ -4,6 +4,7 @@ package com.marklogic.client.datamovement.impl; import com.marklogic.client.DatabaseClient; +import com.marklogic.client.datamovement.DocumentWriteSetFilter; import com.marklogic.client.datamovement.WriteBatch; import com.marklogic.client.datamovement.WriteBatcher; import com.marklogic.client.datamovement.WriteEvent; @@ -16,15 +17,17 @@ * Mutable class that captures the documents to be written. Documents are added via calls to "getDocumentWriteSet()", where the * DocumentWriteSet is empty when this class is constructed. */ -class BatchWriteSet { +class BatchWriteSet implements DocumentWriteSetFilter.Context { private final WriteBatcher batcher; - private final DocumentWriteSet documentWriteSet; private final long batchNumber; private final DatabaseClient client; private final ServerTransform transform; private final String temporalCollection; + // Can be overridden after creation + private DocumentWriteSet documentWriteSet; + private long itemsSoFar; private Runnable onSuccess; private Consumer onFailure; @@ -38,10 +41,21 @@ class BatchWriteSet { this.batchNumber = batchNumber; } + /** + * Must be called if a DocumentWriteSetFilter modified the DocumentWriteSet owned by this class. + * + * @since 8.1.0 + */ + void updateWithFilteredDocumentWriteSet(DocumentWriteSet filteredDocumentWriteSet) { + this.documentWriteSet = filteredDocumentWriteSet; + } + + @Override public DocumentWriteSet getDocumentWriteSet() { return documentWriteSet; } + @Override public long getBatchNumber() { return batchNumber; } @@ -50,6 +64,11 @@ public void setItemsSoFar(long itemsSoFar) { this.itemsSoFar = itemsSoFar; } + @Override + public DatabaseClient getDatabaseClient() { + return client; + } + public DatabaseClient getClient() { return client; } @@ -58,6 +77,7 @@ public ServerTransform getTransform() { return transform; } + @Override public String getTemporalCollection() { return temporalCollection; } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java index 2173034dd..a2ebe835d 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java @@ -3,6 +3,7 @@ */ package com.marklogic.client.datamovement.impl; +import com.marklogic.client.datamovement.DocumentWriteSetFilter; import com.marklogic.client.document.DocumentWriteOperation; import com.marklogic.client.document.DocumentWriteSet; import com.marklogic.client.document.XMLDocumentManager; @@ -13,7 +14,7 @@ import java.io.Closeable; import java.util.function.Consumer; -record BatchWriter(BatchWriteSet batchWriteSet) implements Runnable { +record BatchWriter(BatchWriteSet batchWriteSet, DocumentWriteSetFilter filter) implements Runnable { private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class); @@ -28,6 +29,16 @@ public void run() { logger.trace("Begin write batch {} to forest on host '{}'", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost()); DocumentWriteSet documentWriteSet = batchWriteSet.getDocumentWriteSet(); + if (filter != null) { + documentWriteSet = filter.apply(batchWriteSet); + if (documentWriteSet == null || documentWriteSet.isEmpty()) { + logger.debug("Filter returned empty write set for batch {}, skipping write", batchWriteSet.getBatchNumber()); + closeAllHandles(); + return; + } + batchWriteSet.updateWithFilteredDocumentWriteSet(documentWriteSet); + } + writeDocuments(documentWriteSet); // This seems like it should be part of a finally block - but it's able to throw an exception. Which implies diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java index 154068522..1b376fb85 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java @@ -126,6 +126,7 @@ public class WriteBatcherImpl private boolean initialized = false; private CompletableThreadPoolExecutor threadPool = null; private DocumentMetadataHandle defaultMetadata; + private DocumentWriteSetFilter documentWriteSetFilter; public WriteBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig) { super(moveMgr); @@ -200,7 +201,7 @@ public WriteBatcher add(DocumentWriteOperation writeOperation) { writeSet.getDocumentWriteSet().add(doc); } if ( writeSet.getDocumentWriteSet().size() > minBatchSize ) { - threadPool.submit( new BatchWriter(writeSet) ); + threadPool.submit( new BatchWriter(writeSet, documentWriteSetFilter) ); } } return this; @@ -308,7 +309,7 @@ private void retry(WriteBatch batch, boolean callFailListeners) { for (WriteEvent doc : batch.getItems()) { writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent()); } - BatchWriter runnable = new BatchWriter(writeSet); + BatchWriter runnable = new BatchWriter(writeSet, documentWriteSetFilter); runnable.run(); } @Override @@ -379,7 +380,7 @@ private void flush(boolean waitForCompletion) { DocumentWriteOperation doc = iter.next(); writeSet.getDocumentWriteSet().add(doc); } - threadPool.submit( new BatchWriter(writeSet) ); + threadPool.submit( new BatchWriter(writeSet, documentWriteSetFilter) ); } if (waitForCompletion) awaitCompletion(); @@ -597,7 +598,7 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf for ( WriteEvent doc : writerTask.batchWriteSet().getBatchOfWriteEvents().getItems() ) { writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent()); } - BatchWriter retryWriterTask = new BatchWriter(writeSet); + BatchWriter retryWriterTask = new BatchWriter(writeSet, documentWriteSetFilter); Runnable fretryWriterTask = (Runnable) threadPool.submit(retryWriterTask); threadPool.replaceTask(writerTask, fretryWriterTask); // jump to the next task @@ -846,4 +847,10 @@ public void addAll(Stream operations) { public DocumentMetadataHandle getDocumentMetadata() { return defaultMetadata; } + + @Override + public WriteBatcher withDocumentWriteSetFilter(DocumentWriteSetFilter filter) { + this.documentWriteSetFilter = filter; + return this; + } } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/impl/okhttp/RetryIOExceptionInterceptor.java b/marklogic-client-api/src/main/java/com/marklogic/client/impl/okhttp/RetryIOExceptionInterceptor.java index 656e399c5..b2f57e0c3 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/impl/okhttp/RetryIOExceptionInterceptor.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/impl/okhttp/RetryIOExceptionInterceptor.java @@ -3,6 +3,7 @@ */ package com.marklogic.client.impl.okhttp; +import com.marklogic.client.MarkLogicIOException; import okhttp3.Interceptor; import okhttp3.Request; import okhttp3.Response; @@ -47,7 +48,7 @@ public Response intercept(Chain chain) throws IOException { for (int attempt = 0; attempt <= maxRetries; attempt++) { try { return chain.proceed(request); - } catch (IOException e) { + } catch (MarkLogicIOException | IOException e) { if (attempt == maxRetries || !isRetryableIOException(e)) { logger.warn("Not retryable: {}; {}", e.getClass(), e.getMessage()); throw e; @@ -65,7 +66,7 @@ public Response intercept(Chain chain) throws IOException { throw new IllegalStateException("Unexpected end of retry loop"); } - private boolean isRetryableIOException(IOException e) { + private boolean isRetryableIOException(Exception e) { return e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof UnknownHostException || diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/WriteNakedPropertiesTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/WriteNakedPropertiesTest.java similarity index 80% rename from marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/WriteNakedPropertiesTest.java rename to marklogic-client-api/src/test/java/com/marklogic/client/datamovement/WriteNakedPropertiesTest.java index e97f87158..fb7b58d89 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/WriteNakedPropertiesTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/WriteNakedPropertiesTest.java @@ -1,14 +1,12 @@ /* * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. */ -package com.marklogic.client.test.datamovement; +package com.marklogic.client.datamovement; import com.marklogic.client.DatabaseClient; -import com.marklogic.client.datamovement.DataMovementManager; -import com.marklogic.client.datamovement.WriteBatcher; import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.test.AbstractClientTest; import com.marklogic.client.test.Common; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import javax.xml.namespace.QName; @@ -16,12 +14,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -public class WriteNakedPropertiesTest { - - @BeforeEach - void setup() { - Common.newRestAdminClient().newXMLDocumentManager().delete("/naked.xml"); - } +class WriteNakedPropertiesTest extends AbstractClientTest { @Test void test() { diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/RemoveAllDocumentsFilterTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/RemoveAllDocumentsFilterTest.java new file mode 100644 index 000000000..40a78b817 --- /dev/null +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/RemoveAllDocumentsFilterTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.io.StringHandle; +import com.marklogic.client.test.AbstractClientTest; +import com.marklogic.client.test.Common; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class RemoveAllDocumentsFilterTest extends AbstractClientTest { + + private static final DocumentMetadataHandle METADATA = new DocumentMetadataHandle() + .withCollections("incremental-test") + .withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE); + + AtomicInteger writtenCount = new AtomicInteger(); + + @Test + void filterRemovesAllDocuments() { + new WriteBatcherTemplate(Common.newClient()).runWriteJob( + writeBatcher -> writeBatcher + .withDocumentWriteSetFilter(context -> context.getDatabaseClient().newDocumentManager().newWriteSet()) + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)), + + writeBatcher -> { + for (int i = 1; i <= 10; i++) { + writeBatcher.add("/incremental/test/doc-" + i + ".xml", METADATA, new StringHandle("")); + } + } + ); + + assertEquals(0, writtenCount.get(), "No documents should have been written since the filter removed them all. " + + "This test is verifying that no error will occur either when the filter doesn't return any documents."); + assertCollectionSize("incremental-test", 0); + } +} diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/WriteBatcherTemplate.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/WriteBatcherTemplate.java new file mode 100644 index 000000000..62e066949 --- /dev/null +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/WriteBatcherTemplate.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.DatabaseClient; +import com.marklogic.client.datamovement.DataMovementManager; +import com.marklogic.client.datamovement.WriteBatcher; + +import java.util.function.Consumer; + +// Experimenting with a template that gets rid of some annoying DMSDK boilerplate. +record WriteBatcherTemplate(DatabaseClient databaseClient) { + + public void runWriteJob(Consumer writeBatcherConfigurer, Consumer writeBatcherUser) { + try (DataMovementManager dmm = databaseClient.newDataMovementManager()) { + WriteBatcher writeBatcher = dmm.newWriteBatcher(); + writeBatcherConfigurer.accept(writeBatcher); + + dmm.startJob(writeBatcher); + writeBatcherUser.accept(writeBatcher); + writeBatcher.flushAndWait(); + writeBatcher.awaitCompletion(); + dmm.stopJob(writeBatcher); + } + } +} diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/AbstractClientTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/AbstractClientTest.java index f1e8f726d..0f9c5af8a 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/AbstractClientTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/test/AbstractClientTest.java @@ -31,6 +31,7 @@ protected final String getJavascriptForDeletingDocumentsBeforeTestRuns() { void releaseClient() { if (Common.client != null) { Common.client.release(); + Common.client = null; } } } diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/IncrementalWriteTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/IncrementalWriteTest.java deleted file mode 100644 index 83a81e1c5..000000000 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/IncrementalWriteTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. - */ -package com.marklogic.client.test.datamovement; - -import com.marklogic.client.DatabaseClient; -import com.marklogic.client.datamovement.DataMovementManager; -import com.marklogic.client.datamovement.WriteBatcher; -import com.marklogic.client.io.DocumentMetadataHandle; -import com.marklogic.client.io.StringHandle; -import com.marklogic.client.test.AbstractClientTest; -import com.marklogic.client.test.Common; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -class IncrementalWriteTest extends AbstractClientTest { - - private static final DocumentMetadataHandle METADATA = new DocumentMetadataHandle() - .withCollections("incremental-test") - .withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE); - - @Test - void test() { - AtomicInteger writtenCount = new AtomicInteger(); - - try (DatabaseClient client = Common.newClient()) { - WriteBatcherTemplate template = new WriteBatcherTemplate(client); - - template.runWriteJob(writeBatcher -> writeBatcher - .withThreadCount(1) - .withBatchSize(10) - .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)), - - writeBatcher -> { - for (int i = 1; i <= 20; i++) { - String uri = "/incremental/test/doc-" + i + ".xml"; - String content = "" + i + "This is document number " + i + ""; - writeBatcher.add(uri, METADATA, new StringHandle(content)); - } - } - ); - } - - assertEquals(20, writtenCount.get()); - } - - // Experimenting with a template that gets rid of some annoying DMSDK boilerplate. - private record WriteBatcherTemplate(DatabaseClient databaseClient) { - - public void runWriteJob(Consumer writeBatcherConfigurer, Consumer writeBatcherUser) { - try (DataMovementManager dmm = databaseClient.newDataMovementManager()) { - WriteBatcher writeBatcher = dmm.newWriteBatcher(); - writeBatcherConfigurer.accept(writeBatcher); - - dmm.startJob(writeBatcher); - - writeBatcherUser.accept(writeBatcher); - writeBatcher.awaitCompletion(); - - dmm.stopJob(writeBatcher); - } - } - } -}