Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group=com.marklogic
version=8.0-SNAPSHOT
version=8.1-SNAPSHOT
publishUrl=file:../marklogic-java/releases

okhttpVersion=5.3.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
*/
package com.marklogic.client.datamovement;

import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;

import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
* <p>To facilitate long-running write jobs, batches documents added by many
* external threads and coordinates internal threads to send the batches
Expand Down Expand Up @@ -182,12 +182,7 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
*
* @param queryEvent the information about the batch that failed
*/
public void retry(WriteBatch queryEvent);

/*
public WriteBatcher withTransactionSize(int transactionSize);
public int getTransactionSize();
*/
void retry(WriteBatch queryEvent);

/**
* Get the array of WriteBatchListener instances registered via
Expand Down Expand Up @@ -361,5 +356,5 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
*
* @param writeBatch the information about the batch that failed
*/
public void retryWithFailureListeners(WriteBatch writeBatch);
void retryWithFailureListeners(WriteBatch writeBatch);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.marklogic.client.datamovement.impl;

import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.DocumentWriteSet;
import com.marklogic.client.document.XMLDocumentManager;
import com.marklogic.client.io.Format;
import org.slf4j.Logger;
Expand All @@ -12,70 +13,76 @@
import java.io.Closeable;
import java.util.function.Consumer;

class BatchWriter implements Runnable {
record BatchWriter(BatchWriteSet batchWriteSet) implements Runnable {

Copy link

Copilot AI Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation check for empty batch has been removed during the conversion to a record. Previously, the constructor threw an IllegalStateException if 'batchWriteSet.getDocumentWriteSet().size() == 0'. This validation should be added to a compact constructor in the record to maintain the same safety guarantee.

Suggested change
BatchWriter {
if (batchWriteSet.getDocumentWriteSet().size() == 0) {
throw new IllegalStateException("BatchWriter requires a non-empty DocumentWriteSet.");
}
}

Copilot uses AI. Check for mistakes.
private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);

private final BatchWriteSet batchWriteSet;

BatchWriter(BatchWriteSet batchWriteSet) {
if (batchWriteSet.getDocumentWriteSet().size() == 0) {
throw new IllegalStateException("Attempt to write an empty batch");
}
this.batchWriteSet = batchWriteSet;
}

@Override
public void run() {
if (batchWriteSet.getDocumentWriteSet() == null || batchWriteSet.getDocumentWriteSet().isEmpty()) {
logger.debug("Unexpected empty batch {}, skipping", batchWriteSet.getBatchNumber());
return;
}

try {
logger.trace("begin write batch {} to forest on host \"{}\"", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());
if (batchWriteSet.getTemporalCollection() == null) {
batchWriteSet.getClient().newDocumentManager().write(
batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null
);
} else {
// to get access to the TemporalDocumentManager write overload we need to instantiate
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
// format, so we'll set the default content format to unknown
XMLDocumentManager docMgr = batchWriteSet.getClient().newXMLDocumentManager();
docMgr.setContentFormat(Format.UNKNOWN);
docMgr.write(
batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null, batchWriteSet.getTemporalCollection()
);
}
logger.trace("Begin write batch {} to forest on host '{}'", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());

DocumentWriteSet documentWriteSet = batchWriteSet.getDocumentWriteSet();
writeDocuments(documentWriteSet);

// This seems like it should be part of a finally block - but it's able to throw an exception. Which implies
// that onFailure() should occur when this fails, which seems odd???
closeAllHandles();
Runnable onSuccess = batchWriteSet.getOnSuccess();
if (onSuccess != null) {
onSuccess.run();
}

onSuccess();
} catch (Throwable t) {
logger.trace("failed batch sent to forest on host \"{}\"", batchWriteSet.getClient().getHost());
Consumer<Throwable> onFailure = batchWriteSet.getOnFailure();
if (onFailure != null) {
onFailure.accept(t);
}
onFailure(t);
}
}

private void writeDocuments(DocumentWriteSet documentWriteSet) {
if (batchWriteSet.getTemporalCollection() == null) {
batchWriteSet.getClient().newDocumentManager().write(documentWriteSet, batchWriteSet.getTransform(), null);
} else {
// to get access to the TemporalDocumentManager write overload we need to instantiate
// a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
// format, so we'll set the default content format to unknown
XMLDocumentManager docMgr = batchWriteSet.getClient().newXMLDocumentManager();
docMgr.setContentFormat(Format.UNKNOWN);
docMgr.write(documentWriteSet, batchWriteSet.getTransform(), null, batchWriteSet.getTemporalCollection());
}
}

private void onSuccess() {
Runnable onSuccess = batchWriteSet.getOnSuccess();
if (onSuccess != null) {
onSuccess.run();
}
}

private void onFailure(Throwable t) {
logger.trace("Failed batch sent to forest on host \"{}\"", batchWriteSet.getClient().getHost());
Consumer<Throwable> onFailure = batchWriteSet.getOnFailure();
if (onFailure != null) {
onFailure.accept(t);
}
}

private void closeAllHandles() throws Throwable {
Throwable lastThrowable = null;
for (DocumentWriteOperation doc : batchWriteSet.getDocumentWriteSet()) {
try {
if (doc.getContent() instanceof Closeable) {
((Closeable) doc.getContent()).close();
if (doc.getContent() instanceof Closeable closeable) {
closeable.close();
}
if (doc.getMetadata() instanceof Closeable) {
((Closeable) doc.getMetadata()).close();
if (doc.getMetadata() instanceof Closeable closeable) {
closeable.close();
}
} catch (Throwable t) {
logger.error("error calling close()", t);
logger.error("Error closing all handles in BatchWriter", t);
lastThrowable = t;
}
}
if (lastThrowable != null) throw lastThrowable;
}

public BatchWriteSet getBatchWriteSet() {
return batchWriteSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,25 @@
*/
package com.marklogic.client.datamovement.impl;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.datamovement.*;
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.document.DocumentWriteOperation.OperationType;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.impl.DocumentWriteOperationImpl;
import com.marklogic.client.impl.Utilities;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.io.marker.ContentHandle;
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.marklogic.client.datamovement.DataMovementException;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.Forest;
import com.marklogic.client.datamovement.ForestConfiguration;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.WriteBatch;
import com.marklogic.client.datamovement.WriteBatchListener;
import com.marklogic.client.datamovement.WriteEvent;
import com.marklogic.client.datamovement.WriteFailureListener;
import com.marklogic.client.datamovement.WriteBatcher;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

/**
* The implementation of WriteBatcher.
Expand Down Expand Up @@ -254,19 +238,19 @@ public WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle
}

private void requireInitialized() {
if ( initialized == false ) {
if (!initialized) {
throw new IllegalStateException("This operation must be called after starting this job");
}
}

private void requireNotInitialized() {
if ( initialized == true ) {
if (initialized) {
throw new IllegalStateException("Configuration cannot be changed after starting this job or calling add or addAs");
}
}

private void requireNotStopped() {
if ( isStopped() == true ) throw new IllegalStateException("This instance has been stopped");
if (isStopped()) throw new IllegalStateException("This instance has been stopped");
}

private BatchWriteSet newBatchWriteSet() {
Expand All @@ -278,12 +262,8 @@ private BatchWriteSet newBatchWriteSet(long batchNum) {
int hostToUse = (int) (batchNum % hostInfos.length);
HostInfo host = hostInfos[hostToUse];
BatchWriteSet batchWriteSet = new BatchWriteSet(this, host.client, getTransform(), getTemporalCollection(), batchNum);
batchWriteSet.onSuccess( () -> {
sendSuccessToListeners(batchWriteSet);
});
batchWriteSet.onFailure( (throwable) -> {
sendThrowableToListeners(throwable, "Error writing batch: {}", batchWriteSet);
});
batchWriteSet.onSuccess( () -> sendSuccessToListeners(batchWriteSet));
batchWriteSet.onFailure(throwable -> sendThrowableToListeners(throwable, batchWriteSet));
return batchWriteSet;
}

Expand Down Expand Up @@ -311,7 +291,7 @@ public void retry(WriteBatch batch) {
}

private void retry(WriteBatch batch, boolean callFailListeners) {
if ( isStopped() == true ) {
if (isStopped()) {
logger.warn("Job is now stopped, aborting the retry");
return;
}
Expand Down Expand Up @@ -385,9 +365,9 @@ private void flush(boolean waitForCompletion) {
}
Iterator<DocumentWriteOperation> iter = docs.iterator();
for ( int i=0; iter.hasNext(); i++ ) {
if ( isStopped() == true ) {
if (isStopped()) {
logger.warn("Job is now stopped, preventing the flush of {} queued docs", docs.size() - i);
if ( waitForCompletion == true ) awaitCompletion();
if (waitForCompletion) awaitCompletion();
return;
}
BatchWriteSet writeSet = newBatchWriteSet();
Expand All @@ -402,7 +382,7 @@ private void flush(boolean waitForCompletion) {
threadPool.submit( new BatchWriter(writeSet) );
}

if ( waitForCompletion == true ) awaitCompletion();
if (waitForCompletion) awaitCompletion();
}

private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
Expand All @@ -417,7 +397,7 @@ private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
}
}

private void sendThrowableToListeners(Throwable t, String message, BatchWriteSet batchWriteSet) {
private void sendThrowableToListeners(Throwable t, BatchWriteSet batchWriteSet) {
batchWriteSet.setItemsSoFar(itemsSoFar.get());
WriteBatch batch = batchWriteSet.getBatchOfWriteEvents();
for ( WriteFailureListener failureListener : failureListeners ) {
Expand All @@ -427,7 +407,7 @@ private void sendThrowableToListeners(Throwable t, String message, BatchWriteSet
logger.error("Exception thrown by an onBatchFailure listener", t2);
}
}
if ( message != null ) logger.warn(message, t.toString());
logger.warn("Error writing batch: {}", t.toString());
}

@Override
Expand Down Expand Up @@ -606,15 +586,15 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
for ( Runnable task : tasks ) {
if ( task instanceof BatchWriter ) {
BatchWriter writerTask = (BatchWriter) task;
if ( removedHostInfos.containsKey(writerTask.getBatchWriteSet().getClient().getHost()) ) {
if ( removedHostInfos.containsKey(writerTask.batchWriteSet().getClient().getHost()) ) {
// this batch was targeting a host that's no longer on the list
// if we re-add these docs they'll now be in batches that target acceptable hosts
BatchWriteSet writeSet = newBatchWriteSet(writerTask.getBatchWriteSet().getBatchNumber());
BatchWriteSet writeSet = newBatchWriteSet(writerTask.batchWriteSet().getBatchNumber());
writeSet.onFailure(throwable -> {
if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable;
else throw new DataMovementException("Failed to retry batch after failover", throwable);
});
for ( WriteEvent doc : writerTask.getBatchWriteSet().getBatchOfWriteEvents().getItems() ) {
for ( WriteEvent doc : writerTask.batchWriteSet().getBatchOfWriteEvents().getItems() ) {
writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
}
BatchWriter retryWriterTask = new BatchWriter(writeSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.marklogic.client.DatabaseClient;
import com.marklogic.junit5.AbstractMarkLogicTest;
import org.junit.jupiter.api.AfterEach;

/**
* Intended to be the base class for all future client API tests, as it properly prepares the database by deleting
Expand All @@ -25,4 +26,11 @@ protected final String getJavascriptForDeletingDocumentsBeforeTestRuns() {
.toArray().forEach(item => xdmp.documentDelete(item))
""";
}

@AfterEach
void releaseClient() {
if (Common.client != null) {
Common.client.release();
}
}
}