From deb45924142aad964d8df65927d4286cb3169501 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 7 Jan 2026 21:58:53 +0800 Subject: [PATCH] [core] Rollback 'COMPACT' commit for row-level operations --- .../org/apache/paimon/AbstractFileStore.java | 10 +- .../apache/paimon/catalog/TableRollback.java | 29 +++++ .../paimon/operation/FileStoreCommitImpl.java | 65 +++++++--- .../operation/commit/CommitRollback.java | 45 +++++++ .../operation/commit/ConflictDetection.java | 61 +++++---- .../operation/commit/RetryCommitResult.java | 44 +++++-- .../paimon/table/CatalogEnvironment.java | 16 +++ .../paimon/operation/FileDeletionTest.java | 1 + .../paimon/operation/FileStoreCommitTest.java | 4 +- .../apache/paimon/rest/RESTCatalogTest.java | 117 ++++++++++++++++++ 10 files changed, 346 insertions(+), 46 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 33a60e73d7d9..4f8bfb68e981 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions.ExternalPathStrategy; import org.apache.paimon.catalog.RenamingSnapshotCommit; import org.apache.paimon.catalog.SnapshotCommit; +import org.apache.paimon.catalog.TableRollback; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; @@ -43,6 +44,7 @@ import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.operation.commit.CommitRollback; import org.apache.paimon.operation.commit.ConflictDetection; import org.apache.paimon.operation.commit.StrictModeChecker; import org.apache.paimon.partition.PartitionExpireStrategy; @@ -288,6 +290,11 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { commitUser, this::newScan, options.commitStrictModeLastSafeSnapshot().orElse(null)); + CommitRollback rollback = null; + TableRollback tableRollback = catalogEnvironment.catalogTableRollback(); + if (tableRollback != null) { + rollback = new CommitRollback(tableRollback); + } return new FileStoreCommitImpl( snapshotCommit, fileIO, @@ -320,7 +327,8 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { options.rowTrackingEnabled(), options.commitDiscardDuplicateFiles(), conflictDetection, - strictModeChecker); + strictModeChecker, + rollback); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java b/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java new file mode 100644 index 000000000000..5cb2a521924d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.table.Instant; + +import javax.annotation.Nullable; + +/** Rollback table to instant from snapshot. */ +public interface TableRollback { + + void rollbackTo(Instant instant, @Nullable Long fromSnapshot); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index bdcb4482d08c..6494b6185513 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -43,10 +43,12 @@ import org.apache.paimon.operation.commit.CommitChangesProvider; import org.apache.paimon.operation.commit.CommitCleaner; import org.apache.paimon.operation.commit.CommitResult; +import org.apache.paimon.operation.commit.CommitRollback; import org.apache.paimon.operation.commit.CommitScanner; import org.apache.paimon.operation.commit.ConflictDetection; import org.apache.paimon.operation.commit.ManifestEntryChanges; import org.apache.paimon.operation.commit.RetryCommitResult; +import org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult; import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned; import org.apache.paimon.operation.commit.StrictModeChecker; import org.apache.paimon.operation.commit.SuccessCommitResult; @@ -138,6 +140,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final ManifestFile manifestFile; private final ManifestList manifestList; private final IndexManifestFile indexManifestFile; + @Nullable private final CommitRollback rollback; private final CommitScanner scanner; private final int numBucket; private final MemorySize manifestTargetSize; @@ -195,7 +198,8 @@ public FileStoreCommitImpl( boolean rowTrackingEnabled, boolean discardDuplicateFiles, ConflictDetection conflictDetection, - @Nullable StrictModeChecker strictModeChecker) { + @Nullable StrictModeChecker strictModeChecker, + @Nullable CommitRollback rollback) { this.snapshotCommit = snapshotCommit; this.fileIO = fileIO; this.schemaManager = schemaManager; @@ -209,6 +213,7 @@ public FileStoreCommitImpl( this.manifestFile = manifestFileFactory.create(); this.manifestList = manifestListFactory.create(); this.indexManifestFile = indexManifestFileFactory.create(); + this.rollback = rollback; this.scanner = new CommitScanner(scan, indexManifestFile, options); this.numBucket = numBucket; this.manifestTargetSize = manifestTargetSize; @@ -313,10 +318,13 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { if (appendCommitCheckConflict) { checkAppendFiles = true; } + + boolean allowRollback = false; if (containsFileDeletionOrDeletionVectors( appendSimpleEntries, changes.appendIndexFiles)) { commitKind = CommitKind.OVERWRITE; checkAppendFiles = true; + allowRollback = true; } attempts += @@ -329,6 +337,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { committable.watermark(), committable.properties(), commitKind, + allowRollback, checkAppendFiles, null); generatedSnapshot += 1; @@ -347,6 +356,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { committable.watermark(), committable.properties(), CommitKind.COMPACT, + false, true, null); generatedSnapshot += 1; @@ -512,6 +522,7 @@ public int overwritePartition( committable.watermark(), committable.properties(), CommitKind.COMPACT, + false, true, null); generatedSnapshot += 1; @@ -652,6 +663,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) { Collections.emptyMap(), CommitKind.ANALYZE, false, + false, statsFileName); } @@ -678,6 +690,7 @@ private int tryCommit( @Nullable Long watermark, Map properties, CommitKind commitKind, + boolean allowRollback, boolean detectConflicts, @Nullable String statsFileName) { int retryCount = 0; @@ -696,6 +709,7 @@ private int tryCommit( watermark, properties, commitKind, + allowRollback, latestSnapshot, detectConflicts, statsFileName); @@ -742,6 +756,7 @@ private int tryOverwritePartition( watermark, properties, CommitKind.OVERWRITE, + false, true, null); } @@ -756,6 +771,7 @@ CommitResult tryCommitOnce( @Nullable Long watermark, Map properties, CommitKind commitKind, + boolean allowRollback, @Nullable Snapshot latestSnapshot, boolean detectConflicts, @Nullable String newStatsFileName) { @@ -763,13 +779,15 @@ CommitResult tryCommitOnce( // Check if the commit has been completed. At this point, there will be no more repeated // commits and just return success - if (retryResult != null && latestSnapshot != null) { + if (retryResult instanceof CommitFailRetryResult && latestSnapshot != null) { + CommitFailRetryResult commitFailRetry = (CommitFailRetryResult) retryResult; Map snapshotCache = new HashMap<>(); snapshotCache.put(latestSnapshot.id(), latestSnapshot); long startCheckSnapshot = Snapshot.FIRST_SNAPSHOT_ID; - if (retryResult.latestSnapshot != null) { - snapshotCache.put(retryResult.latestSnapshot.id(), retryResult.latestSnapshot); - startCheckSnapshot = retryResult.latestSnapshot.id() + 1; + if (commitFailRetry.latestSnapshot != null) { + snapshotCache.put( + commitFailRetry.latestSnapshot.id(), commitFailRetry.latestSnapshot); + startCheckSnapshot = commitFailRetry.latestSnapshot.id() + 1; } for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) { Snapshot snapshot = snapshotCache.computeIfAbsent(i, snapshotManager::snapshot); @@ -813,11 +831,17 @@ CommitResult tryCommitOnce( // latestSnapshotId is different from the snapshot id we've checked for conflicts, // so we have to check again List changedPartitions = changedPartitions(deltaFiles, indexFiles); - if (retryResult != null && retryResult.latestSnapshot != null) { - baseDataFiles = new ArrayList<>(retryResult.baseDataFiles); + CommitFailRetryResult commitFailRetry = + retryResult instanceof CommitFailRetryResult + ? (CommitFailRetryResult) retryResult + : null; + if (commitFailRetry != null + && commitFailRetry.latestSnapshot != null + && commitFailRetry.baseDataFiles != null) { + baseDataFiles = new ArrayList<>(commitFailRetry.baseDataFiles); List incremental = scanner.readIncrementalChanges( - retryResult.latestSnapshot, latestSnapshot, changedPartitions); + commitFailRetry.latestSnapshot, latestSnapshot, changedPartitions); if (!incremental.isEmpty()) { baseDataFiles.addAll(incremental); baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles)); @@ -837,12 +861,21 @@ CommitResult tryCommitOnce( .filter(entry -> !baseIdentifiers.contains(entry.identifier())) .collect(Collectors.toList()); } - conflictDetection.checkNoConflictsOrFail( - latestSnapshot, - baseDataFiles, - SimpleFileEntry.from(deltaFiles), - indexFiles, - commitKind); + Optional exception = + conflictDetection.checkConflicts( + latestSnapshot, + baseDataFiles, + SimpleFileEntry.from(deltaFiles), + indexFiles, + commitKind); + if (exception.isPresent()) { + if (allowRollback && rollback != null) { + if (rollback.tryToRollback(latestSnapshot)) { + return RetryCommitResult.forRollback(exception.get()); + } + } + throw exception.get(); + } } Snapshot newSnapshot; @@ -971,7 +1004,7 @@ CommitResult tryCommitOnce( } catch (Exception e) { // commit exception, not sure about the situation and should not clean up the files LOG.warn("Retry commit for exception.", e); - return new RetryCommitResult(latestSnapshot, baseDataFiles, e); + return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, e); } if (!success) { @@ -988,7 +1021,7 @@ CommitResult tryCommitOnce( commitTime); commitCleaner.cleanUpNoReuseTmpManifests( baseManifestList, mergeBeforeManifests, mergeAfterManifests); - return new RetryCommitResult(latestSnapshot, baseDataFiles, null); + return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, null); } LOG.info( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java new file mode 100644 index 000000000000..683b6555a651 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.TableRollback; +import org.apache.paimon.table.Instant; + +/** Commit rollback to rollback 'COMPACT' commits for resolving conflicts. */ +public class CommitRollback { + + private final TableRollback tableRollback; + + public CommitRollback(TableRollback tableRollback) { + this.tableRollback = tableRollback; + } + + public boolean tryToRollback(Snapshot latestSnapshot) { + if (latestSnapshot.commitKind() == Snapshot.CommitKind.COMPACT) { + long latest = latestSnapshot.id(); + try { + tableRollback.rollbackTo(Instant.snapshot(latest - 1), latest); + return true; + } catch (Exception ignored) { + } + } + return false; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 82d01dfdc5d7..990b47f0f6ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -102,7 +103,7 @@ public void withPartitionExpire(PartitionExpire partitionExpire) { this.partitionExpire = partitionExpire; } - public void checkNoConflictsOrFail( + public Optional checkConflicts( Snapshot snapshot, List baseEntries, List deltaEntries, @@ -126,14 +127,20 @@ public void checkNoConflictsOrFail( deltaEntries = buildDeltaEntriesWithDV(baseEntries, deltaEntries, deltaIndexEntries); } catch (Throwable e) { - throw conflictException(commitUser, baseEntries, deltaEntries).apply(e); + return Optional.of( + conflictException(commitUser, baseEntries, deltaEntries).apply(e)); } } List allEntries = new ArrayList<>(baseEntries); allEntries.addAll(deltaEntries); - checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries, baseCommitUser); + Optional exception = + checkBucketKeepSame( + baseEntries, deltaEntries, commitKind, allEntries, baseCommitUser); + if (exception.isPresent()) { + return exception; + } Function conflictException = conflictException(baseCommitUser, baseEntries, deltaEntries); @@ -151,21 +158,24 @@ public void checkNoConflictsOrFail( // merge manifest entries and also check if the files we want to delete are still there mergedEntries = FileEntry.mergeEntries(allEntries); } catch (Throwable e) { - throw conflictException.apply(e); + return Optional.of(conflictException.apply(e)); } - checkNoDeleteInMergedEntries(mergedEntries, conflictException); - checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries, baseCommitUser); + exception = checkDeleteInEntries(mergedEntries, conflictException); + if (exception.isPresent()) { + return exception; + } + return checkKeyRange(baseEntries, deltaEntries, mergedEntries, baseCommitUser); } - private void checkBucketKeepSame( + private Optional checkBucketKeepSame( List baseEntries, List deltaEntries, CommitKind commitKind, List allEntries, String baseCommitUser) { if (commitKind == CommitKind.OVERWRITE) { - return; + return Optional.empty(); } // total buckets within the same partition should remain the same @@ -199,18 +209,19 @@ private void checkBucketKeepSame( deltaEntries, null); LOG.warn("", conflictException.getLeft()); - throw conflictException.getRight(); + return Optional.of(conflictException.getRight()); } + return Optional.empty(); } - private void checkKeyRangeNoConflicts( + private Optional checkKeyRange( List baseEntries, List deltaEntries, Collection mergedEntries, String baseCommitUser) { // fast exit for file store without keys if (keyComparator == null) { - return; + return Optional.empty(); } // group entries by partitions, buckets and levels @@ -244,10 +255,11 @@ private void checkKeyRangeNoConflicts( null); LOG.warn("", conflictException.getLeft()); - throw conflictException.getRight(); + return Optional.of(conflictException.getRight()); } } } + return Optional.empty(); } private Function conflictException( @@ -271,7 +283,7 @@ private boolean checkForDeletionVector() { return deletionVectorsEnabled && bucketMode.equals(BucketMode.BUCKET_UNAWARE); } - private void checkNoDeleteInMergedEntries( + private Optional checkDeleteInEntries( Collection mergedEntries, Function exceptionFunction) { try { @@ -283,12 +295,17 @@ private void checkNoDeleteInMergedEntries( tableName); } } catch (Throwable e) { - assertConflictForPartitionExpire(mergedEntries); - throw exceptionFunction.apply(e); + Optional exception = assertConflictForPartitionExpire(mergedEntries); + if (exception.isPresent()) { + return exception; + } + return Optional.of(exceptionFunction.apply(e)); } + return Optional.empty(); } - private void assertConflictForPartitionExpire(Collection mergedEntries) { + private Optional assertConflictForPartitionExpire( + Collection mergedEntries) { if (partitionExpire != null && partitionExpire.isValueExpiration()) { Set deletedPartitions = new HashSet<>(); for (SimpleFileEntry entry : mergedEntries) { @@ -304,13 +321,15 @@ private void assertConflictForPartitionExpire(Collection merged partToSimpleString( partitionType, partition, "-", 200)) .collect(Collectors.toList()); - throw new RuntimeException( - "You are writing data to expired partitions, and you can filter this data to avoid job failover." - + " Otherwise, continuous expired records will cause the job to failover restart continuously." - + " Expired partitions are: " - + expiredPartitions); + return Optional.of( + new RuntimeException( + "You are writing data to expired partitions, and you can filter this data to avoid job failover." + + " Otherwise, continuous expired records will cause the job to failover restart continuously." + + " Expired partitions are: " + + expiredPartitions)); } } + return Optional.empty(); } static List buildBaseEntriesWithDV( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java index e64049ea63c2..b9e0ab2a2ef0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java @@ -21,24 +21,54 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.SimpleFileEntry; +import javax.annotation.Nullable; + import java.util.List; /** Need to retry commit of {@link CommitResult}. */ -public class RetryCommitResult implements CommitResult { +public abstract class RetryCommitResult implements CommitResult { - public final Snapshot latestSnapshot; - public final List baseDataFiles; public final Exception exception; - public RetryCommitResult( - Snapshot latestSnapshot, List baseDataFiles, Exception exception) { - this.latestSnapshot = latestSnapshot; - this.baseDataFiles = baseDataFiles; + private RetryCommitResult(Exception exception) { this.exception = exception; } + public static RetryCommitResult forCommitFail( + Snapshot snapshot, List baseDataFiles, Exception exception) { + return new CommitFailRetryResult(snapshot, baseDataFiles, exception); + } + + public static RetryCommitResult forRollback(Exception exception) { + return new RollbackRetryResult(exception); + } + @Override public boolean isSuccess() { return false; } + + /** Retry result for commit failing. */ + public static class CommitFailRetryResult extends RetryCommitResult { + + public final @Nullable Snapshot latestSnapshot; + public final @Nullable List baseDataFiles; + + private CommitFailRetryResult( + @Nullable Snapshot latestSnapshot, + @Nullable List baseDataFiles, + Exception exception) { + super(exception); + this.latestSnapshot = latestSnapshot; + this.baseDataFiles = baseDataFiles; + } + } + + /** Retry result for rollback. */ + public static class RollbackRetryResult extends RetryCommitResult { + + private RollbackRetryResult(Exception exception) { + super(exception); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java index a0a23d8ca4d0..8f68d3e04a74 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java @@ -28,6 +28,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.RenamingSnapshotCommit; import org.apache.paimon.catalog.SnapshotCommit; +import org.apache.paimon.catalog.TableRollback; import org.apache.paimon.operation.Lock; import org.apache.paimon.table.source.TableQueryAuth; import org.apache.paimon.tag.SnapshotLoaderImpl; @@ -112,6 +113,21 @@ public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) { return snapshotCommit; } + @Nullable + public TableRollback catalogTableRollback() { + if (catalogLoader != null && supportsVersionManagement) { + Catalog catalog = catalogLoader.load(); + return (instant, fromSnapshot) -> { + try { + catalog.rollbackTo(identifier, instant, fromSnapshot); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + }; + } + return null; + } + @Nullable public SnapshotLoader snapshotLoader() { if (catalogLoader == null) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 69812952bfad..580662607941 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -929,6 +929,7 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { null, Collections.emptyMap(), Snapshot.CommitKind.APPEND, + false, store.snapshotManager().latestSnapshot(), true, null); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 75a8271ae16d..322920fc7ea6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -1019,12 +1019,13 @@ public void testCommitTwiceWithDifferentKind() throws Exception { null, Collections.emptyMap(), Snapshot.CommitKind.APPEND, + false, firstLatest, true, null); // Compact commit.tryCommitOnce( - new RetryCommitResult(firstLatest, Collections.emptyList(), null), + RetryCommitResult.forCommitFail(firstLatest, Collections.emptyList(), null), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), @@ -1032,6 +1033,7 @@ public void testCommitTwiceWithDifferentKind() throws Exception { null, Collections.emptyMap(), Snapshot.CommitKind.COMPACT, + false, store.snapshotManager().latestSnapshot(), true, null); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 300a94287e3b..a1b00b502af3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.PagedList; import org.apache.paimon.Snapshot; import org.apache.paimon.TableType; +import org.apache.paimon.append.AppendCompactTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogTestBase; @@ -37,6 +38,11 @@ import org.apache.paimon.function.Function; import org.apache.paimon.function.FunctionChange; import org.apache.paimon.function.FunctionDefinition; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.operation.BaseAppendFileStoreWrite; +import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; @@ -71,8 +77,10 @@ import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; @@ -111,6 +119,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; @@ -120,6 +129,7 @@ import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.OBJECT_TABLE; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; +import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER; @@ -3420,6 +3430,113 @@ void testColumnMaskingAndRowFilter() throws Exception { assertThat(rows.get(0).getString(1).toString()).isIn("Alice", "Bob", "Charlie", "David"); } + @Test + public void testConflictRollback() throws Exception { + doTestConflictRollback(false); + } + + @Test + public void testConflictRollbackFail() throws Exception { + doTestConflictRollback(true); + } + + private void doTestConflictRollback(boolean insertMiddle) throws Exception { + Identifier identifier = + Identifier.create("test_conflict_rollback", "test_conflict_rollback"); + catalog.createDatabase(identifier.getDatabaseName(), true); + catalog.createTable( + identifier, + new Schema( + Lists.newArrayList(new DataField(0, "col1", DataTypes.INT())), + emptyList(), + emptyList(), + new HashMap<>(), + ""), + true); + Table table = catalog.getTable(identifier); + + // write 5 files + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + List files = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(i)); + List commitMessages = write.prepareCommit(); + commit.commit(commitMessages); + DataFileMeta file = + ((CommitMessageImpl) commitMessages.get(0)) + .newFilesIncrement() + .newFiles() + .get(0); + files.add(file); + } + } + + // delete write + DataFileMeta file = files.get(0); + CommitMessageImpl deleteCommitMessage = + new CommitMessageImpl( + EMPTY_ROW, + 0, + -1, + new DataIncrement(emptyList(), singletonList(file), emptyList()), + new CompactIncrement(emptyList(), emptyList(), emptyList())); + + // compact write + CommitMessage compactCommitMessage; + try (BatchTableWrite write = writeBuilder.newWrite()) { + AppendCompactTask compactTask = new AppendCompactTask(EMPTY_ROW, files); + FileStoreWrite fileStoreWrite = ((TableWriteImpl) write).getWrite(); + compactCommitMessage = + compactTask.doCompact( + (FileStoreTable) table, (BaseAppendFileStoreWrite) fileStoreWrite); + } + + // do compact commit first + try (BatchTableCommit commit = writeBuilder.newCommit()) { + commit.commit(singletonList(compactCommitMessage)); + } + + if (insertMiddle) { + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(0)); + commit.commit(write.prepareCommit()); + } + } + + // do delete commit after + // expire snapshots first + SnapshotManager snapshotManager = ((FileStoreTable) table).snapshotManager(); + snapshotManager.deleteSnapshot(1); + snapshotManager.deleteSnapshot(2); + try (BatchTableCommit commit = writeBuilder.newCommit()) { + List messages = singletonList(deleteCommitMessage); + if (insertMiddle) { + assertThatThrownBy(() -> commit.commit(messages)) + .hasMessageContaining("File deletion conflicts detected"); + } else { + // should rollback compact commit + commit.commit(messages); + } + } + + // scan for rollback success + if (!insertMiddle) { + ReadBuilder readBuilder = table.newReadBuilder(); + List result = new ArrayList<>(); + readBuilder + .newRead() + .createReader(readBuilder.newScan().plan()) + .forEachRemaining(r -> result.add(r.getInt(0))); + assertThat(result).containsExactlyInAnyOrder(1, 2, 3, 4); + } + + // clear + catalog.dropDatabase(identifier.getDatabaseName(), false, true); + } + protected void createTable( Identifier identifier, Map options, List partitionKeys) throws Exception {