diff --git a/gradle.properties b/gradle.properties
index 7e8c4ac9b..d109496b4 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,5 +1,5 @@
group=com.marklogic
-version=8.0-SNAPSHOT
+version=8.1-SNAPSHOT
publishUrl=file:../marklogic-java/releases
okhttpVersion=5.3.2
diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java
index 2757ad58b..656b029cb 100644
--- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java
+++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java
@@ -3,15 +3,15 @@
*/
package com.marklogic.client.datamovement;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
/**
*
To facilitate long-running write jobs, batches documents added by many
* external threads and coordinates internal threads to send the batches
@@ -182,12 +182,7 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
*
* @param queryEvent the information about the batch that failed
*/
- public void retry(WriteBatch queryEvent);
-
- /*
- public WriteBatcher withTransactionSize(int transactionSize);
- public int getTransactionSize();
- */
+ void retry(WriteBatch queryEvent);
/**
* Get the array of WriteBatchListener instances registered via
@@ -361,5 +356,5 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
*
* @param writeBatch the information about the batch that failed
*/
- public void retryWithFailureListeners(WriteBatch writeBatch);
+ void retryWithFailureListeners(WriteBatch writeBatch);
}
diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java
index 037a781f3..2173034dd 100644
--- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java
+++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java
@@ -4,6 +4,7 @@
package com.marklogic.client.datamovement.impl;
import com.marklogic.client.document.DocumentWriteOperation;
+import com.marklogic.client.document.DocumentWriteSet;
import com.marklogic.client.document.XMLDocumentManager;
import com.marklogic.client.io.Format;
import org.slf4j.Logger;
@@ -12,48 +13,58 @@
import java.io.Closeable;
import java.util.function.Consumer;
-class BatchWriter implements Runnable {
+record BatchWriter(BatchWriteSet batchWriteSet) implements Runnable {
private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);
- private final BatchWriteSet batchWriteSet;
-
- BatchWriter(BatchWriteSet batchWriteSet) {
- if (batchWriteSet.getDocumentWriteSet().size() == 0) {
- throw new IllegalStateException("Attempt to write an empty batch");
- }
- this.batchWriteSet = batchWriteSet;
- }
-
@Override
public void run() {
+ if (batchWriteSet.getDocumentWriteSet() == null || batchWriteSet.getDocumentWriteSet().isEmpty()) {
+ logger.debug("Unexpected empty batch {}, skipping", batchWriteSet.getBatchNumber());
+ return;
+ }
+
try {
- logger.trace("begin write batch {} to forest on host \"{}\"", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());
- if (batchWriteSet.getTemporalCollection() == null) {
- batchWriteSet.getClient().newDocumentManager().write(
- batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null
- );
- } else {
- // to get access to the TemporalDocumentManager write overload we need to instantiate
- // a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
- // format, so we'll set the default content format to unknown
- XMLDocumentManager docMgr = batchWriteSet.getClient().newXMLDocumentManager();
- docMgr.setContentFormat(Format.UNKNOWN);
- docMgr.write(
- batchWriteSet.getDocumentWriteSet(), batchWriteSet.getTransform(), null, batchWriteSet.getTemporalCollection()
- );
- }
+ logger.trace("Begin write batch {} to forest on host '{}'", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());
+
+ DocumentWriteSet documentWriteSet = batchWriteSet.getDocumentWriteSet();
+ writeDocuments(documentWriteSet);
+
+ // This seems like it should be part of a finally block - but it's able to throw an exception. Which implies
+ // that onFailure() should occur when this fails, which seems odd???
closeAllHandles();
- Runnable onSuccess = batchWriteSet.getOnSuccess();
- if (onSuccess != null) {
- onSuccess.run();
- }
+
+ onSuccess();
} catch (Throwable t) {
- logger.trace("failed batch sent to forest on host \"{}\"", batchWriteSet.getClient().getHost());
- Consumer onFailure = batchWriteSet.getOnFailure();
- if (onFailure != null) {
- onFailure.accept(t);
- }
+ onFailure(t);
+ }
+ }
+
+ private void writeDocuments(DocumentWriteSet documentWriteSet) {
+ if (batchWriteSet.getTemporalCollection() == null) {
+ batchWriteSet.getClient().newDocumentManager().write(documentWriteSet, batchWriteSet.getTransform(), null);
+ } else {
+ // to get access to the TemporalDocumentManager write overload we need to instantiate
+ // a JSONDocumentManager or XMLDocumentManager, but we don't want to make assumptions about content
+ // format, so we'll set the default content format to unknown
+ XMLDocumentManager docMgr = batchWriteSet.getClient().newXMLDocumentManager();
+ docMgr.setContentFormat(Format.UNKNOWN);
+ docMgr.write(documentWriteSet, batchWriteSet.getTransform(), null, batchWriteSet.getTemporalCollection());
+ }
+ }
+
+ private void onSuccess() {
+ Runnable onSuccess = batchWriteSet.getOnSuccess();
+ if (onSuccess != null) {
+ onSuccess.run();
+ }
+ }
+
+ private void onFailure(Throwable t) {
+ logger.trace("Failed batch sent to forest on host \"{}\"", batchWriteSet.getClient().getHost());
+ Consumer onFailure = batchWriteSet.getOnFailure();
+ if (onFailure != null) {
+ onFailure.accept(t);
}
}
@@ -61,21 +72,17 @@ private void closeAllHandles() throws Throwable {
Throwable lastThrowable = null;
for (DocumentWriteOperation doc : batchWriteSet.getDocumentWriteSet()) {
try {
- if (doc.getContent() instanceof Closeable) {
- ((Closeable) doc.getContent()).close();
+ if (doc.getContent() instanceof Closeable closeable) {
+ closeable.close();
}
- if (doc.getMetadata() instanceof Closeable) {
- ((Closeable) doc.getMetadata()).close();
+ if (doc.getMetadata() instanceof Closeable closeable) {
+ closeable.close();
}
} catch (Throwable t) {
- logger.error("error calling close()", t);
+ logger.error("Error closing all handles in BatchWriter", t);
lastThrowable = t;
}
}
if (lastThrowable != null) throw lastThrowable;
}
-
- public BatchWriteSet getBatchWriteSet() {
- return batchWriteSet;
- }
}
diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java
index e7ae80d9c..154068522 100644
--- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java
+++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java
@@ -3,41 +3,25 @@
*/
package com.marklogic.client.datamovement.impl;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Stream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
+import com.marklogic.client.datamovement.*;
import com.marklogic.client.document.DocumentWriteOperation;
-import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.document.DocumentWriteOperation.OperationType;
-import com.marklogic.client.io.DocumentMetadataHandle;
+import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.impl.DocumentWriteOperationImpl;
import com.marklogic.client.impl.Utilities;
+import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.io.marker.ContentHandle;
import com.marklogic.client.io.marker.DocumentMetadataWriteHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.marklogic.client.datamovement.DataMovementException;
-import com.marklogic.client.datamovement.DataMovementManager;
-import com.marklogic.client.datamovement.Forest;
-import com.marklogic.client.datamovement.ForestConfiguration;
-import com.marklogic.client.datamovement.JobTicket;
-import com.marklogic.client.datamovement.WriteBatch;
-import com.marklogic.client.datamovement.WriteBatchListener;
-import com.marklogic.client.datamovement.WriteEvent;
-import com.marklogic.client.datamovement.WriteFailureListener;
-import com.marklogic.client.datamovement.WriteBatcher;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
/**
* The implementation of WriteBatcher.
@@ -254,19 +238,19 @@ public WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle
}
private void requireInitialized() {
- if ( initialized == false ) {
+ if (!initialized) {
throw new IllegalStateException("This operation must be called after starting this job");
}
}
private void requireNotInitialized() {
- if ( initialized == true ) {
+ if (initialized) {
throw new IllegalStateException("Configuration cannot be changed after starting this job or calling add or addAs");
}
}
private void requireNotStopped() {
- if ( isStopped() == true ) throw new IllegalStateException("This instance has been stopped");
+ if (isStopped()) throw new IllegalStateException("This instance has been stopped");
}
private BatchWriteSet newBatchWriteSet() {
@@ -278,12 +262,8 @@ private BatchWriteSet newBatchWriteSet(long batchNum) {
int hostToUse = (int) (batchNum % hostInfos.length);
HostInfo host = hostInfos[hostToUse];
BatchWriteSet batchWriteSet = new BatchWriteSet(this, host.client, getTransform(), getTemporalCollection(), batchNum);
- batchWriteSet.onSuccess( () -> {
- sendSuccessToListeners(batchWriteSet);
- });
- batchWriteSet.onFailure( (throwable) -> {
- sendThrowableToListeners(throwable, "Error writing batch: {}", batchWriteSet);
- });
+ batchWriteSet.onSuccess( () -> sendSuccessToListeners(batchWriteSet));
+ batchWriteSet.onFailure(throwable -> sendThrowableToListeners(throwable, batchWriteSet));
return batchWriteSet;
}
@@ -311,7 +291,7 @@ public void retry(WriteBatch batch) {
}
private void retry(WriteBatch batch, boolean callFailListeners) {
- if ( isStopped() == true ) {
+ if (isStopped()) {
logger.warn("Job is now stopped, aborting the retry");
return;
}
@@ -385,9 +365,9 @@ private void flush(boolean waitForCompletion) {
}
Iterator iter = docs.iterator();
for ( int i=0; iter.hasNext(); i++ ) {
- if ( isStopped() == true ) {
+ if (isStopped()) {
logger.warn("Job is now stopped, preventing the flush of {} queued docs", docs.size() - i);
- if ( waitForCompletion == true ) awaitCompletion();
+ if (waitForCompletion) awaitCompletion();
return;
}
BatchWriteSet writeSet = newBatchWriteSet();
@@ -402,7 +382,7 @@ private void flush(boolean waitForCompletion) {
threadPool.submit( new BatchWriter(writeSet) );
}
- if ( waitForCompletion == true ) awaitCompletion();
+ if (waitForCompletion) awaitCompletion();
}
private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
@@ -417,7 +397,7 @@ private void sendSuccessToListeners(BatchWriteSet batchWriteSet) {
}
}
- private void sendThrowableToListeners(Throwable t, String message, BatchWriteSet batchWriteSet) {
+ private void sendThrowableToListeners(Throwable t, BatchWriteSet batchWriteSet) {
batchWriteSet.setItemsSoFar(itemsSoFar.get());
WriteBatch batch = batchWriteSet.getBatchOfWriteEvents();
for ( WriteFailureListener failureListener : failureListeners ) {
@@ -427,7 +407,7 @@ private void sendThrowableToListeners(Throwable t, String message, BatchWriteSet
logger.error("Exception thrown by an onBatchFailure listener", t2);
}
}
- if ( message != null ) logger.warn(message, t.toString());
+ logger.warn("Error writing batch: {}", t.toString());
}
@Override
@@ -606,15 +586,15 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
for ( Runnable task : tasks ) {
if ( task instanceof BatchWriter ) {
BatchWriter writerTask = (BatchWriter) task;
- if ( removedHostInfos.containsKey(writerTask.getBatchWriteSet().getClient().getHost()) ) {
+ if ( removedHostInfos.containsKey(writerTask.batchWriteSet().getClient().getHost()) ) {
// this batch was targeting a host that's no longer on the list
// if we re-add these docs they'll now be in batches that target acceptable hosts
- BatchWriteSet writeSet = newBatchWriteSet(writerTask.getBatchWriteSet().getBatchNumber());
+ BatchWriteSet writeSet = newBatchWriteSet(writerTask.batchWriteSet().getBatchNumber());
writeSet.onFailure(throwable -> {
if ( throwable instanceof RuntimeException ) throw (RuntimeException) throwable;
else throw new DataMovementException("Failed to retry batch after failover", throwable);
});
- for ( WriteEvent doc : writerTask.getBatchWriteSet().getBatchOfWriteEvents().getItems() ) {
+ for ( WriteEvent doc : writerTask.batchWriteSet().getBatchOfWriteEvents().getItems() ) {
writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
}
BatchWriter retryWriterTask = new BatchWriter(writeSet);
diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/AbstractClientTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/AbstractClientTest.java
index 020aa21d7..f1e8f726d 100644
--- a/marklogic-client-api/src/test/java/com/marklogic/client/test/AbstractClientTest.java
+++ b/marklogic-client-api/src/test/java/com/marklogic/client/test/AbstractClientTest.java
@@ -5,6 +5,7 @@
import com.marklogic.client.DatabaseClient;
import com.marklogic.junit5.AbstractMarkLogicTest;
+import org.junit.jupiter.api.AfterEach;
/**
* Intended to be the base class for all future client API tests, as it properly prepares the database by deleting
@@ -25,4 +26,11 @@ protected final String getJavascriptForDeletingDocumentsBeforeTestRuns() {
.toArray().forEach(item => xdmp.documentDelete(item))
""";
}
+
+ @AfterEach
+ void releaseClient() {
+ if (Common.client != null) {
+ Common.client.release();
+ }
+ }
}