Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .copyrightconfig
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ startyear: 2010
# - Dotfiles already skipped automatically
# Enable by removing the leading '# ' from the next line and editing values.
# filesexcluded: third_party/*, docs/generated/*.md, assets/*.png, scripts/temp_*.py, vendor/lib.js
filesexcluded: .github/*, README.md, Jenkinsfile, gradle/*, docker-compose.yaml, docker-compose.yml, *.gradle, gradle.properties, gradlew, gradlew.bat, **/test/resources/**, *.md, pom.xml, *.properties, *.json, *.xml
filesexcluded: .github/*, README.md, Jenkinsfile, gradle/*, docker-compose.yaml, docker-compose.yml, *.gradle, gradle.properties, gradlew, gradlew.bat, **/test/resources/**, *.md, pom.xml, *.properties, *.json, *.xml, CODEOWNERS
2 changes: 1 addition & 1 deletion CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# Each line is a file pattern followed by one or more owners.

# These owners will be the default owners for everything in the repo.
* @anu3990 @billfarber @rjrudin @stevebio
* @billfarber @rjrudin @stevebio
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.datamovement;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.document.DocumentWriteSet;

import java.util.function.Function;

/**
* A filter that can modify a DocumentWriteSet before it is written to the database.
*
* @since 8.1.0
*/
public interface DocumentWriteSetFilter extends Function<DocumentWriteSetFilter.Context, DocumentWriteSet> {

interface Context {
/**
* @return the DocumentWriteSet to be written
*/
DocumentWriteSet getDocumentWriteSet();

/**
* @return the batch number
*/
long getBatchNumber();

/**
* @return the DatabaseClient being used for this batch
*/
DatabaseClient getDatabaseClient();

/**
* @return the temporal collection name, or null if not writing to a temporal collection
*/
String getTemporalCollection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,17 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
* @param writeBatch the information about the batch that failed
*/
void retryWithFailureListeners(WriteBatch writeBatch);

/**
* Sets a filter to modify or replace the DocumentWriteSet before it is written.
* The filter can return either the modified DocumentWriteSet or a new one.
* If the filter returns null or an empty DocumentWriteSet, no write will occur.
*
* @param filter the function to apply before writing
* @return this instance for method chaining
* @since 8.1.0
*/
default WriteBatcher withDocumentWriteSetFilter(DocumentWriteSetFilter filter) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.marklogic.client.datamovement.impl;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
import com.marklogic.client.datamovement.WriteBatch;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.datamovement.WriteEvent;
Expand All @@ -16,15 +17,17 @@
* Mutable class that captures the documents to be written. Documents are added via calls to "getDocumentWriteSet()", where the
* DocumentWriteSet is empty when this class is constructed.
*/
class BatchWriteSet {
class BatchWriteSet implements DocumentWriteSetFilter.Context {

private final WriteBatcher batcher;
private final DocumentWriteSet documentWriteSet;
private final long batchNumber;
private final DatabaseClient client;
private final ServerTransform transform;
private final String temporalCollection;

// Can be overridden after creation
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment 'Can be overridden after creation' is misleading as this is a private field, not an overridable method. Consider revising to 'Can be replaced after creation via updateWithFilteredDocumentWriteSet()' to more accurately describe the intended usage pattern.

Suggested change
// Can be overridden after creation
// Can be replaced after creation via updateWithFilteredDocumentWriteSet()

Copilot uses AI. Check for mistakes.
private DocumentWriteSet documentWriteSet;

private long itemsSoFar;
private Runnable onSuccess;
private Consumer<Throwable> onFailure;
Expand All @@ -38,10 +41,21 @@ class BatchWriteSet {
this.batchNumber = batchNumber;
}

/**
* Must be called if a DocumentWriteSetFilter modified the DocumentWriteSet owned by this class.
*
* @since 8.1.0
*/
void updateWithFilteredDocumentWriteSet(DocumentWriteSet filteredDocumentWriteSet) {
this.documentWriteSet = filteredDocumentWriteSet;
}

@Override
public DocumentWriteSet getDocumentWriteSet() {
return documentWriteSet;
}

@Override
public long getBatchNumber() {
return batchNumber;
}
Expand All @@ -50,6 +64,11 @@ public void setItemsSoFar(long itemsSoFar) {
this.itemsSoFar = itemsSoFar;
}

@Override
public DatabaseClient getDatabaseClient() {
return client;
}

public DatabaseClient getClient() {
return client;
}
Expand All @@ -58,6 +77,7 @@ public ServerTransform getTransform() {
return transform;
}

@Override
public String getTemporalCollection() {
return temporalCollection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package com.marklogic.client.datamovement.impl;

import com.marklogic.client.datamovement.DocumentWriteSetFilter;
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.DocumentWriteSet;
import com.marklogic.client.document.XMLDocumentManager;
Expand All @@ -13,7 +14,7 @@
import java.io.Closeable;
import java.util.function.Consumer;

record BatchWriter(BatchWriteSet batchWriteSet) implements Runnable {
record BatchWriter(BatchWriteSet batchWriteSet, DocumentWriteSetFilter filter) implements Runnable {

private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);

Expand All @@ -28,6 +29,16 @@ public void run() {
logger.trace("Begin write batch {} to forest on host '{}'", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());

DocumentWriteSet documentWriteSet = batchWriteSet.getDocumentWriteSet();
if (filter != null) {
documentWriteSet = filter.apply(batchWriteSet);
if (documentWriteSet == null || documentWriteSet.isEmpty()) {
logger.debug("Filter returned empty write set for batch {}, skipping write", batchWriteSet.getBatchNumber());
closeAllHandles();
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the filter returns an empty DocumentWriteSet, closeAllHandles() is called on the original batchWriteSet.getDocumentWriteSet() handles, but these handles are not necessarily the same as those in the filtered documentWriteSet. If the filter created a new DocumentWriteSet, the original handles from batchWriteSet won't be closed. Consider closing handles from both the original and filtered sets, or ensure handles are properly managed when filters return new DocumentWriteSets.

Copilot uses AI. Check for mistakes.
return;
}
batchWriteSet.updateWithFilteredDocumentWriteSet(documentWriteSet);
}

writeDocuments(documentWriteSet);

// This seems like it should be part of a finally block - but it's able to throw an exception. Which implies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class WriteBatcherImpl
private boolean initialized = false;
private CompletableThreadPoolExecutor threadPool = null;
private DocumentMetadataHandle defaultMetadata;
private DocumentWriteSetFilter documentWriteSetFilter;

public WriteBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig) {
super(moveMgr);
Expand Down Expand Up @@ -200,7 +201,7 @@ public WriteBatcher add(DocumentWriteOperation writeOperation) {
writeSet.getDocumentWriteSet().add(doc);
}
if ( writeSet.getDocumentWriteSet().size() > minBatchSize ) {
threadPool.submit( new BatchWriter(writeSet) );
threadPool.submit( new BatchWriter(writeSet, documentWriteSetFilter) );
}
}
return this;
Expand Down Expand Up @@ -308,7 +309,7 @@ private void retry(WriteBatch batch, boolean callFailListeners) {
for (WriteEvent doc : batch.getItems()) {
writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
}
BatchWriter runnable = new BatchWriter(writeSet);
BatchWriter runnable = new BatchWriter(writeSet, documentWriteSetFilter);
runnable.run();
}
@Override
Expand Down Expand Up @@ -379,7 +380,7 @@ private void flush(boolean waitForCompletion) {
DocumentWriteOperation doc = iter.next();
writeSet.getDocumentWriteSet().add(doc);
}
threadPool.submit( new BatchWriter(writeSet) );
threadPool.submit( new BatchWriter(writeSet, documentWriteSetFilter) );
}

if (waitForCompletion) awaitCompletion();
Expand Down Expand Up @@ -597,7 +598,7 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
for ( WriteEvent doc : writerTask.batchWriteSet().getBatchOfWriteEvents().getItems() ) {
writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
}
BatchWriter retryWriterTask = new BatchWriter(writeSet);
BatchWriter retryWriterTask = new BatchWriter(writeSet, documentWriteSetFilter);
Runnable fretryWriterTask = (Runnable) threadPool.submit(retryWriterTask);
threadPool.replaceTask(writerTask, fretryWriterTask);
// jump to the next task
Expand Down Expand Up @@ -846,4 +847,10 @@ public void addAll(Stream<? extends DocumentWriteOperation> operations) {
public DocumentMetadataHandle getDocumentMetadata() {
return defaultMetadata;
}

@Override
public WriteBatcher withDocumentWriteSetFilter(DocumentWriteSetFilter filter) {
this.documentWriteSetFilter = filter;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package com.marklogic.client.impl.okhttp;

import com.marklogic.client.MarkLogicIOException;
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;
Expand Down Expand Up @@ -47,7 +48,7 @@ public Response intercept(Chain chain) throws IOException {
for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
return chain.proceed(request);
} catch (IOException e) {
} catch (MarkLogicIOException | IOException e) {
if (attempt == maxRetries || !isRetryableIOException(e)) {
logger.warn("Not retryable: {}; {}", e.getClass(), e.getMessage());
throw e;
Expand All @@ -65,7 +66,7 @@ public Response intercept(Chain chain) throws IOException {
throw new IllegalStateException("Unexpected end of retry loop");
}

private boolean isRetryableIOException(IOException e) {
private boolean isRetryableIOException(Exception e) {
return e instanceof ConnectException ||
e instanceof SocketTimeoutException ||
e instanceof UnknownHostException ||
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,20 @@
/*
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.test.datamovement;
package com.marklogic.client.datamovement;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.test.AbstractClientTest;
import com.marklogic.client.test.Common;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import javax.xml.namespace.QName;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class WriteNakedPropertiesTest {

@BeforeEach
void setup() {
Common.newRestAdminClient().newXMLDocumentManager().delete("/naked.xml");
}
class WriteNakedPropertiesTest extends AbstractClientTest {

@Test
void test() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.datamovement.filter;

import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.test.AbstractClientTest;
import com.marklogic.client.test.Common;
import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;

class RemoveAllDocumentsFilterTest extends AbstractClientTest {

private static final DocumentMetadataHandle METADATA = new DocumentMetadataHandle()
.withCollections("incremental-test")
.withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE);

AtomicInteger writtenCount = new AtomicInteger();

@Test
void filterRemovesAllDocuments() {
new WriteBatcherTemplate(Common.newClient()).runWriteJob(
writeBatcher -> writeBatcher
.withDocumentWriteSetFilter(context -> context.getDatabaseClient().newDocumentManager().newWriteSet())
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)),

writeBatcher -> {
for (int i = 1; i <= 10; i++) {
writeBatcher.add("/incremental/test/doc-" + i + ".xml", METADATA, new StringHandle("<doc/>"));
}
}
);

assertEquals(0, writtenCount.get(), "No documents should have been written since the filter removed them all. " +
"This test is verifying that no error will occur either when the filter doesn't return any documents.");
assertCollectionSize("incremental-test", 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.datamovement.filter;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.WriteBatcher;

import java.util.function.Consumer;

// Experimenting with a template that gets rid of some annoying DMSDK boilerplate.
record WriteBatcherTemplate(DatabaseClient databaseClient) {
Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WriteBatcherTemplate record has package-private visibility but is used across different test packages (e.g., RemoveAllDocumentsFilterTest). Consider making it public or moving it to a more accessible location to support its intended cross-package usage.

Suggested change
record WriteBatcherTemplate(DatabaseClient databaseClient) {
public record WriteBatcherTemplate(DatabaseClient databaseClient) {

Copilot uses AI. Check for mistakes.

public void runWriteJob(Consumer<WriteBatcher> writeBatcherConfigurer, Consumer<WriteBatcher> writeBatcherUser) {
try (DataMovementManager dmm = databaseClient.newDataMovementManager()) {
WriteBatcher writeBatcher = dmm.newWriteBatcher();
writeBatcherConfigurer.accept(writeBatcher);

dmm.startJob(writeBatcher);
writeBatcherUser.accept(writeBatcher);
writeBatcher.flushAndWait();
writeBatcher.awaitCompletion();
dmm.stopJob(writeBatcher);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ protected final String getJavascriptForDeletingDocumentsBeforeTestRuns() {
void releaseClient() {
if (Common.client != null) {
Common.client.release();
Common.client = null;
}
}
}
Loading