From bd24bcc734ad8577aee1d0b0540db84aa31b3ea6 Mon Sep 17 00:00:00 2001 From: GWphua Date: Wed, 18 Mar 2026 14:05:47 +0800 Subject: [PATCH 1/8] Shuffle solution Add delete for datasegments Unit Tests and Embedded tests Cleanup --- docs/configuration/index.md | 2 +- .../indexing/IndexParallelTaskTest.java | 28 +++ .../parallel/ParallelIndexSupervisorTask.java | 73 +++++++- .../DeepStorageIntermediaryDataManager.java | 16 +- .../ParallelIndexSupervisorTaskTest.java | 162 ++++++++++++++++++ 5 files changed, 278 insertions(+), 3 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index a8b50b5bf23e..4e0467f6f081 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1376,7 +1376,7 @@ Processing properties set on the Middle Manager are passed through to Peons. |`druid.processing.numTimeoutThreads`|The number of processing threads to have available for handling per-segment query timeouts. Setting this value to `0` removes the ability to service per-segment timeouts, irrespective of `perSegmentTimeout` query context parameter. As these threads are just servicing timers, it's recommended to set this value to some small percent (e.g. 5%) of the total query processing cores available to the peon.|0| |`druid.processing.fifo`|Enables the processing queue to treat tasks of equal priority in a FIFO manner.|`true`| |`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| -|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks.
Set to `local` to store segment files in the local storage of the Middle Manager or Indexer.
Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid does not support automated cleanup for the `shuffle-data` directory. You can set up cloud storage lifecycle rules for automated cleanup of data at the `shuffle-data` prefix location.|`local`| +|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks.
Set to `local` to store segment files in the local storage of the Middle Manager or Indexer.
Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid automatically cleans up shuffle data from deep storage when the parallel indexing task completes, regardless of success or failure. If the supervisor task itself crashes before cleanup completes, you may need to manually remove residual files or set up cloud storage lifecycle rules for the `shuffle-data` prefix.|`local`| |`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing memory pools to be allocated in parallel on process launch. This may significantly speed up Peon launch times if allocating several large buffers.|`false`| The amount of direct memory needed by Druid is at least diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java index 19ee1061a864..4e1527a3f804 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java @@ -30,6 +30,7 @@ import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.task.TaskBuilder; +import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -48,6 +49,7 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; +import java.io.File; import java.util.List; import java.util.Optional; @@ -206,6 +208,32 @@ public void test_runIndexTask_andAppendData(PartitionsSpec partitionsSpec) runGroupByQuery("Crimson Typhoon,2,1810.0,18100.0"); } + public static List getMultiPhasePartitionsSpecs() + { + return List.of( + new HashedPartitionsSpec(null, 2, null, null), + new SingleDimensionPartitionsSpec(2, null, "namespace", false) + ); + } + + @MethodSource("getMultiPhasePartitionsSpecs") + @ParameterizedTest(name = "partitionsSpec={0}") + public void test_shuffleDataIsCleanedUp_afterSuccessfulMultiPhaseTask(PartitionsSpec partitionsSpec) + { + final TaskBuilder.IndexParallel indexTask = buildIndexParallelTask(partitionsSpec, false); + runTask(indexTask, dataSource); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + + final File deepStoreDir = cluster.getTestFolder().getOrCreateFolder("deep-store"); + final File shuffleDir = new File(deepStoreDir, DeepStorageIntermediaryDataManager.SHUFFLE_DATA_DIR_PREFIX); + + if (shuffleDir.exists()) { + final File[] remainingFiles = shuffleDir.listFiles(); + Assertions.assertNotNull(remainingFiles); + Assertions.assertEquals(0, remainingFiles.length); + } + } + /** * Creates a builder for an "index_parallel" task to ingest into {@link #dataSource}. */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index c4f37192ee11..be14c6197b06 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -77,6 +77,7 @@ import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.realtime.ChatHandler; import org.apache.druid.segment.realtime.ChatHandlers; +import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.server.security.Action; @@ -84,7 +85,9 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.BuildingShardSpec; +import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionBoundaries; import org.apache.druid.utils.CollectionUtils; @@ -210,6 +213,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask private Long segmentsPublished; private final boolean isCompactionTask; + @Nullable + private volatile Map deepStorageShuffleReports; + @JsonCreator public ParallelIndexSupervisorTask( @JsonProperty("id") String id, @@ -812,6 +818,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except ); return TaskStatus.failure(getId(), errMsg); } + deepStorageShuffleReports = indexingRunner.getReports(); indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); // 2. Partial segment merge phase @@ -919,7 +926,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep ); return TaskStatus.failure(getId(), errMsg); } - + deepStorageShuffleReports = indexingRunner.getReports(); indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); // partition (interval, partitionId) -> partition locations @@ -957,6 +964,66 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep return taskStatus; } + /** + * Cleans up deep storage shuffle data produced during phase 1 of multi-phase parallel indexing. + * + *

Cleanup is performed here in the supervisor task rather than in + * {@link org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager} because of + * the process model: phase-1 sub-tasks run as separate peon processes that exit before phase 2 + * starts. Each sub-task's DeepStorageIntermediaryDataManager instance is destroyed when the peon + * exits, so no surviving manager instance has knowledge of what files were pushed. The supervisor + * task is the only entity that is both alive after phase 2 completes and has the complete set of + * loadSpecs (collected from all sub-task reports). + * + *

This method constructs minimal {@link DataSegment} objects from the + * {@link DeepStoragePartitionStat} loadSpecs and delegates deletion to + * {@link DataSegmentKiller}, which routes to the appropriate storage-specific + * killer (S3, HDFS, GCS, Azure, Local) via {@code OmniDataSegmentKiller}. + * + *

Exceptions are caught and logged as warnings. Cleanup failures must never + * cause the overall indexing task to fail. + * + * @param killer the segment killer from {@link TaskToolbox#getDataSegmentKiller()} + * @param reports phase-1 sub-task reports containing partition stats with loadSpecs, + * may be null or empty if phase 1 produced no output + */ + @VisibleForTesting + static void cleanupDeepStorageShuffleData( + DataSegmentKiller killer, + String datasource, + @Nullable Map reports + ) + { + if (reports == null || reports.isEmpty()) { + return; + } + + final List segmentsToKill = new ArrayList<>(); + for (final GeneratedPartitionsReport report : reports.values()) { + for (final PartitionStat stat : report.getPartitionStats()) { + if (stat instanceof DeepStoragePartitionStat deepStorageStat) { + segmentsToKill.add( + DataSegment.builder( + SegmentId.of(datasource, deepStorageStat.getInterval(), "shuffle_data", deepStorageStat.getBucketId()) + ).loadSpec(deepStorageStat.getLoadSpec()).build() + ); + } + } + } + + if (segmentsToKill.isEmpty()) { + return; + } + + LOG.info("Cleaning up [%d] deep storage shuffle files.", segmentsToKill.size()); + try { + killer.kill(segmentsToKill); + } + catch (Exception e) { + LOG.warn(e, "Failed to clean up deep storage shuffle data. Residual files may need manual cleanup."); + } + } + private static Map mergeCardinalityReports(Collection reports) { Map finalCollectors = new HashMap<>(); @@ -1835,6 +1902,10 @@ private Pair, Map> doGetRowStatsAndUnparseab @Override public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { + // Shuffle data in deep storage is only needed for phase 2 to read. + // Clean up regardless of task outcome (success or failure). + cleanupDeepStorageShuffleData(toolbox.getDataSegmentKiller(), getDataSource(), deepStorageShuffleReports); + if (!isCompactionTask) { super.cleanUp(toolbox, taskStatus); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java index 34199fc9e230..e6d74a99483f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java @@ -95,9 +95,23 @@ public Optional findPartitionFile(String supervisorTaskId, String su throw new UnsupportedOperationException("Not supported, get partition file using segment loadspec"); } + /** + * Not implemented for deep storage mode. Unlike {@link LocalIntermediaryDataManager}, + * which can walk the local filesystem to find and delete files by supervisorTaskId, + * this manager has no way to discover what files were pushed: it has no + * {@link org.apache.druid.segment.loading.DataSegmentKiller}, does not track pushed + * paths, and runs on short-lived peon processes whose state is lost on exit. + * + *

Deep storage shuffle cleanup is instead handled by + * {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask#cleanupDeepStorageShuffleData}, + * which uses the phase-1 reports (containing loadSpecs) to identify and + * delete files via {@link org.apache.druid.segment.loading.DataSegmentKiller}. + */ @Override public void deletePartitions(String supervisorTaskId) { - throw new UnsupportedOperationException("Not supported"); + throw new UnsupportedOperationException( + "Deep storage shuffle cleanup is handled by ParallelIndexSupervisorTask, not by the data manager" + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 6d2b7eded2a5..fbe290aecfa9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -41,6 +41,8 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.loading.DataSegmentKiller; +import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.indexing.OverlordClient; @@ -50,6 +52,7 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.DimensionRangeBucketShardSpec; import org.apache.druid.timeline.partition.HashPartitionFunction; @@ -618,4 +621,163 @@ private void verifyPartitionIdAndLocations( } } + public static class CleanupTest + { + private static final String TEST_DATASOURCE = "test-datasource"; + private static final Interval DAY1 = Intervals.of("2024-01-01/2024-01-02"); + private static final Interval DAY2 = Intervals.of("2024-01-02/2024-01-03"); + + @Test + public void testCleanupWithDeepStoragePartitionStats() throws Exception + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Arrays.asList( + createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1"), + createDeepStoragePartitionStat(DAY1, 1, "s3", "bucket-1", "key-2") + ), null)); + reports.put("subtask-2", new GeneratedPartitionsReport("subtask-2", Arrays.asList( + createDeepStoragePartitionStat(DAY2, 0, "s3", "bucket-1", "key-3") + ), null)); + + killer.kill(EasyMock.>anyObject()); + EasyMock.expectLastCall().andAnswer(() -> { + final List segments = (List) EasyMock.getCurrentArguments()[0]; + Assert.assertEquals(3, segments.size()); + final Set keys = segments.stream() + .map(s -> s.getLoadSpec().get("key")) + .collect(Collectors.toSet()); + Assert.assertEquals( + new HashSet<>(Arrays.asList("key-1", "key-2", "key-3")), + keys + ); + return null; + }); + EasyMock.replay(killer); + + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + + EasyMock.verify(killer); + } + + @Test + public void testCleanupWithLocalPartitionStats() + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + EasyMock.replay(killer); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Arrays.asList( + createLocalPartitionStat(DAY1, 0), + createLocalPartitionStat(DAY1, 1) + ), null)); + + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + + EasyMock.verify(killer); + } + + @Test + public void testCleanupWithMixedPartitionStats() throws Exception + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Arrays.asList( + createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-deep"), + createLocalPartitionStat(DAY1, 1) + ), null)); + + killer.kill(EasyMock.>anyObject()); + EasyMock.expectLastCall().andAnswer(() -> { + final List segments = (List) EasyMock.getCurrentArguments()[0]; + Assert.assertEquals(1, segments.size()); + Assert.assertEquals("key-deep", segments.get(0).getLoadSpec().get("key")); + return null; + }); + EasyMock.replay(killer); + + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + + EasyMock.verify(killer); + } + + @Test + public void testCleanupDoesNotFailWhenKillerThrows() throws Exception + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Collections.singletonList( + createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1") + ), null)); + + killer.kill(EasyMock.>anyObject()); + EasyMock.expectLastCall().andThrow(new SegmentLoadingException("Simulated failure")); + EasyMock.replay(killer); + + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + + EasyMock.verify(killer); + } + + @Test + public void testCleanupCalledEvenWhenPhase2Fails() throws Exception + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Collections.singletonList( + createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1") + ), null)); + + killer.kill(EasyMock.>anyObject()); + EasyMock.expectLastCall(); + EasyMock.replay(killer); + + boolean phase2Failed = false; + try { + throw new RuntimeException("Simulated phase 2 failure"); + } + catch (RuntimeException e) { + phase2Failed = true; + } + finally { + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + } + + Assert.assertTrue("Phase 2 should have failed", phase2Failed); + EasyMock.verify(killer); + } + + private static DeepStoragePartitionStat createDeepStoragePartitionStat( + Interval interval, + int bucketId, + String type, + String bucket, + String key + ) + { + return new DeepStoragePartitionStat( + interval, + new DimensionRangeBucketShardSpec(bucketId, Arrays.asList("dim1"), null, null), + ImmutableMap.of("type", type, "bucket", bucket, "key", key) + ); + } + + private static GenericPartitionStat createLocalPartitionStat(Interval interval, int bucketId) + { + return new GenericPartitionStat( + "host", + 8080, + false, + interval, + new DimensionRangeBucketShardSpec(bucketId, Arrays.asList("dim1"), null, null), + null, + null + ); + } + } + } From 54723c23f7f7028b3bb7c869cc242508c9205ca3 Mon Sep 17 00:00:00 2001 From: GWphua Date: Wed, 18 Mar 2026 17:04:22 +0800 Subject: [PATCH 2/8] Better task logs when fail killing. --- docs/configuration/index.md | 2 +- .../parallel/ParallelIndexSupervisorTask.java | 24 ++++++++----------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 4e0467f6f081..ae1e6fb27545 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1376,7 +1376,7 @@ Processing properties set on the Middle Manager are passed through to Peons. |`druid.processing.numTimeoutThreads`|The number of processing threads to have available for handling per-segment query timeouts. Setting this value to `0` removes the ability to service per-segment timeouts, irrespective of `perSegmentTimeout` query context parameter. As these threads are just servicing timers, it's recommended to set this value to some small percent (e.g. 5%) of the total query processing cores available to the peon.|0| |`druid.processing.fifo`|Enables the processing queue to treat tasks of equal priority in a FIFO manner.|`true`| |`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| -|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks.
Set to `local` to store segment files in the local storage of the Middle Manager or Indexer.
Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid automatically cleans up shuffle data from deep storage when the parallel indexing task completes, regardless of success or failure. If the supervisor task itself crashes before cleanup completes, you may need to manually remove residual files or set up cloud storage lifecycle rules for the `shuffle-data` prefix.|`local`| +|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks.
Set to `local` to store segment files in the local storage of the Middle Manager or Indexer.
Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid automatically cleans up shuffle data from deep storage when the parallel indexing task completes.|`local`| |`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing memory pools to be allocated in parallel on process launch. This may significantly speed up Peon launch times if allocating several large buffers.|`false`| The amount of direct memory needed by Druid is at least diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index be14c6197b06..c23e02707e0e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -966,26 +966,21 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep /** * Cleans up deep storage shuffle data produced during phase 1 of multi-phase parallel indexing. - * - *

Cleanup is performed here in the supervisor task rather than in + *

+ * Cleanup is performed here in the supervisor task rather than in * {@link org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager} because of * the process model: phase-1 sub-tasks run as separate peon processes that exit before phase 2 * starts. Each sub-task's DeepStorageIntermediaryDataManager instance is destroyed when the peon * exits, so no surviving manager instance has knowledge of what files were pushed. The supervisor * task is the only entity that is both alive after phase 2 completes and has the complete set of * loadSpecs (collected from all sub-task reports). + *

+ * This method constructs minimal {@link DataSegment} objects from {@link DeepStoragePartitionStat} loadSpecs and + * delegates deletion to the appropriate storage-specific {@link DataSegmentKiller}. * - *

This method constructs minimal {@link DataSegment} objects from the - * {@link DeepStoragePartitionStat} loadSpecs and delegates deletion to - * {@link DataSegmentKiller}, which routes to the appropriate storage-specific - * killer (S3, HDFS, GCS, Azure, Local) via {@code OmniDataSegmentKiller}. - * - *

Exceptions are caught and logged as warnings. Cleanup failures must never - * cause the overall indexing task to fail. - * - * @param killer the segment killer from {@link TaskToolbox#getDataSegmentKiller()} + * @param killer the segment killer from {@link TaskToolbox#getDataSegmentKiller()}. * @param reports phase-1 sub-task reports containing partition stats with loadSpecs, - * may be null or empty if phase 1 produced no output + * may be null or empty if phase 1 produced no output. */ @VisibleForTesting static void cleanupDeepStorageShuffleData( @@ -1015,12 +1010,13 @@ static void cleanupDeepStorageShuffleData( return; } - LOG.info("Cleaning up [%d] deep storage shuffle files.", segmentsToKill.size()); + LOG.info("Cleaning up [%d] deep storage shuffle files for datasource[%s].", segmentsToKill.size(), datasource); try { killer.kill(segmentsToKill); } catch (Exception e) { - LOG.warn(e, "Failed to clean up deep storage shuffle data. Residual files may need manual cleanup."); + // Cleanup failures must never cause the overall indexing task to fail. + LOG.warn(e, "Failed to clean up deep storage shuffle data files for datasource[%s]", datasource); } } From 4e72f17bcb072ebee39b014161faa402bff3077d Mon Sep 17 00:00:00 2001 From: GWphua Date: Wed, 18 Mar 2026 17:17:42 +0800 Subject: [PATCH 3/8] Docs --- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 2 -- .../shuffle/DeepStorageIntermediaryDataManager.java | 8 +++----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index c23e02707e0e..0391a78e8fe8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1898,8 +1898,6 @@ private Pair, Map> doGetRowStatsAndUnparseab @Override public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { - // Shuffle data in deep storage is only needed for phase 2 to read. - // Clean up regardless of task outcome (success or failure). cleanupDeepStorageShuffleData(toolbox.getDataSegmentKiller(), getDataSource(), deepStorageShuffleReports); if (!isCompactionTask) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java index e6d74a99483f..a6e71172c3ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java @@ -101,11 +101,9 @@ public Optional findPartitionFile(String supervisorTaskId, String su * this manager has no way to discover what files were pushed: it has no * {@link org.apache.druid.segment.loading.DataSegmentKiller}, does not track pushed * paths, and runs on short-lived peon processes whose state is lost on exit. - * - *

Deep storage shuffle cleanup is instead handled by - * {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask#cleanupDeepStorageShuffleData}, - * which uses the phase-1 reports (containing loadSpecs) to identify and - * delete files via {@link org.apache.druid.segment.loading.DataSegmentKiller}. + *

+ * Deep storage shuffle cleanup is handled by + * {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask#cleanupDeepStorageShuffleData}. */ @Override public void deletePartitions(String supervisorTaskId) From b8a3cc385065cc208e50be8bb8dd134a89b4a322 Mon Sep 17 00:00:00 2001 From: GWphua Date: Wed, 18 Mar 2026 17:48:16 +0800 Subject: [PATCH 4/8] Checkstyle --- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 3 +-- .../task/batch/parallel/ParallelIndexSupervisorTaskTest.java | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 0391a78e8fe8..396173de4913 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -74,10 +74,10 @@ import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.realtime.ChatHandler; import org.apache.druid.segment.realtime.ChatHandlers; -import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.server.security.Action; @@ -87,7 +87,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.BuildingShardSpec; -import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionBoundaries; import org.apache.druid.utils.CollectionUtils; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index fbe290aecfa9..22a3249d0f3b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -41,8 +41,6 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.segment.loading.DataSegmentKiller; -import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.indexing.OverlordClient; @@ -51,6 +49,8 @@ import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.loading.DataSegmentKiller; +import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; From 55d3db849d953dda2ef34729e5c895eb61ed1a27 Mon Sep 17 00:00:00 2001 From: GWphua Date: Thu, 19 Mar 2026 18:57:56 +0800 Subject: [PATCH 5/8] Cleanup function --- .../indexing/IndexParallelTaskTest.java | 20 ------------------- .../parallel/ParallelIndexSupervisorTask.java | 8 ++------ 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java index 4e1527a3f804..75858801a32e 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java @@ -30,7 +30,6 @@ import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.task.TaskBuilder; -import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -49,7 +48,6 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; -import java.io.File; import java.util.List; import java.util.Optional; @@ -216,24 +214,6 @@ public static List getMultiPhasePartitionsSpecs() ); } - @MethodSource("getMultiPhasePartitionsSpecs") - @ParameterizedTest(name = "partitionsSpec={0}") - public void test_shuffleDataIsCleanedUp_afterSuccessfulMultiPhaseTask(PartitionsSpec partitionsSpec) - { - final TaskBuilder.IndexParallel indexTask = buildIndexParallelTask(partitionsSpec, false); - runTask(indexTask, dataSource); - cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); - - final File deepStoreDir = cluster.getTestFolder().getOrCreateFolder("deep-store"); - final File shuffleDir = new File(deepStoreDir, DeepStorageIntermediaryDataManager.SHUFFLE_DATA_DIR_PREFIX); - - if (shuffleDir.exists()) { - final File[] remainingFiles = shuffleDir.listFiles(); - Assertions.assertNotNull(remainingFiles); - Assertions.assertEquals(0, remainingFiles.length); - } - } - /** * Creates a builder for an "index_parallel" task to ingest into {@link #dataSource}. */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 396173de4913..6be816bc9bbd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -212,9 +212,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask private Long segmentsPublished; private final boolean isCompactionTask; - @Nullable - private volatile Map deepStorageShuffleReports; - @JsonCreator public ParallelIndexSupervisorTask( @JsonProperty("id") String id, @@ -817,7 +814,6 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except ); return TaskStatus.failure(getId(), errMsg); } - deepStorageShuffleReports = indexingRunner.getReports(); indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); // 2. Partial segment merge phase @@ -925,7 +921,6 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep ); return TaskStatus.failure(getId(), errMsg); } - deepStorageShuffleReports = indexingRunner.getReports(); indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); // partition (interval, partitionId) -> partition locations @@ -1897,7 +1892,8 @@ private Pair, Map> doGetRowStatsAndUnparseab @Override public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { - cleanupDeepStorageShuffleData(toolbox.getDataSegmentKiller(), getDataSource(), deepStorageShuffleReports); + // TODO: Need to find a way to get the reports! + cleanupDeepStorageShuffleData(toolbox.getDataSegmentKiller(), getDataSource(), null); if (!isCompactionTask) { super.cleanUp(toolbox, taskStatus); From 239341194f2b49088634809ae793a7c1b54aa116 Mon Sep 17 00:00:00 2001 From: GWphua Date: Thu, 19 Mar 2026 19:17:57 +0800 Subject: [PATCH 6/8] non-volatile --- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 6be816bc9bbd..a86e81732b05 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -212,6 +212,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask private Long segmentsPublished; private final boolean isCompactionTask; + @Nullable + private Map deepStorageShuffleReports; + @JsonCreator public ParallelIndexSupervisorTask( @JsonProperty("id") String id, @@ -814,6 +817,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except ); return TaskStatus.failure(getId(), errMsg); } + deepStorageShuffleReports = indexingRunner.getReports(); indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); // 2. Partial segment merge phase @@ -921,6 +925,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep ); return TaskStatus.failure(getId(), errMsg); } + deepStorageShuffleReports = indexingRunner.getReports(); indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); // partition (interval, partitionId) -> partition locations @@ -1892,8 +1897,7 @@ private Pair, Map> doGetRowStatsAndUnparseab @Override public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { - // TODO: Need to find a way to get the reports! - cleanupDeepStorageShuffleData(toolbox.getDataSegmentKiller(), getDataSource(), null); + cleanupDeepStorageShuffleData(toolbox.getDataSegmentKiller(), getDataSource(), deepStorageShuffleReports); if (!isCompactionTask) { super.cleanUp(toolbox, taskStatus); From 220049b3e3141f70f62251f1a9acdac0ed8dd2ea Mon Sep 17 00:00:00 2001 From: GWphua Date: Thu, 19 Mar 2026 19:19:19 +0800 Subject: [PATCH 7/8] Clean up unnecessary code --- .../testing/embedded/indexing/IndexParallelTaskTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java index 75858801a32e..19ee1061a864 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java @@ -206,14 +206,6 @@ public void test_runIndexTask_andAppendData(PartitionsSpec partitionsSpec) runGroupByQuery("Crimson Typhoon,2,1810.0,18100.0"); } - public static List getMultiPhasePartitionsSpecs() - { - return List.of( - new HashedPartitionsSpec(null, 2, null, null), - new SingleDimensionPartitionsSpec(2, null, "namespace", false) - ); - } - /** * Creates a builder for an "index_parallel" task to ingest into {@link #dataSource}. */ From d63c8b0a0216d8f3b9a822f691845613e98addae Mon Sep 17 00:00:00 2001 From: GWphua Date: Fri, 20 Mar 2026 10:13:06 +0800 Subject: [PATCH 8/8] Test fixes --- .../task/batch/parallel/ParallelIndexSupervisorTask.java | 1 + .../batch/parallel/ParallelIndexSupervisorTaskTest.java | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index a86e81732b05..6803b542279e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -925,6 +925,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep ); return TaskStatus.failure(getId(), errMsg); } + deepStorageShuffleReports = indexingRunner.getReports(); indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 22a3249d0f3b..c0b9079723a9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -637,7 +637,7 @@ public void testCleanupWithDeepStoragePartitionStats() throws Exception createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1"), createDeepStoragePartitionStat(DAY1, 1, "s3", "bucket-1", "key-2") ), null)); - reports.put("subtask-2", new GeneratedPartitionsReport("subtask-2", Arrays.asList( + reports.put("subtask-2", new GeneratedPartitionsReport("subtask-2", List.of( createDeepStoragePartitionStat(DAY2, 0, "s3", "bucket-1", "key-3") ), null)); @@ -736,7 +736,7 @@ public void testCleanupCalledEvenWhenPhase2Fails() throws Exception EasyMock.expectLastCall(); EasyMock.replay(killer); - boolean phase2Failed = false; + boolean phase2Failed; try { throw new RuntimeException("Simulated phase 2 failure"); } @@ -761,7 +761,7 @@ private static DeepStoragePartitionStat createDeepStoragePartitionStat( { return new DeepStoragePartitionStat( interval, - new DimensionRangeBucketShardSpec(bucketId, Arrays.asList("dim1"), null, null), + new DimensionRangeBucketShardSpec(bucketId, List.of("dim1"), null, null), ImmutableMap.of("type", type, "bucket", bucket, "key", key) ); } @@ -773,7 +773,7 @@ private static GenericPartitionStat createLocalPartitionStat(Interval interval, 8080, false, interval, - new DimensionRangeBucketShardSpec(bucketId, Arrays.asList("dim1"), null, null), + new DimensionRangeBucketShardSpec(bucketId, List.of("dim1"), null, null), null, null );