diff --git a/docs/configuration/index.md b/docs/configuration/index.md index a8b50b5bf23e..ae1e6fb27545 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1376,7 +1376,7 @@ Processing properties set on the Middle Manager are passed through to Peons. |`druid.processing.numTimeoutThreads`|The number of processing threads to have available for handling per-segment query timeouts. Setting this value to `0` removes the ability to service per-segment timeouts, irrespective of `perSegmentTimeout` query context parameter. As these threads are just servicing timers, it's recommended to set this value to some small percent (e.g. 5%) of the total query processing cores available to the peon.|0| |`druid.processing.fifo`|Enables the processing queue to treat tasks of equal priority in a FIFO manner.|`true`| |`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| -|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks.
Set to `local` to store segment files in the local storage of the Middle Manager or Indexer.
Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid does not support automated cleanup for the `shuffle-data` directory. You can set up cloud storage lifecycle rules for automated cleanup of data at the `shuffle-data` prefix location.|`local`| +|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks.
Set to `local` to store segment files in the local storage of the Middle Manager or Indexer.
Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid automatically cleans up shuffle data from deep storage when the parallel indexing task completes.|`local`| |`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing memory pools to be allocated in parallel on process launch. This may significantly speed up Peon launch times if allocating several large buffers.|`false`| The amount of direct memory needed by Druid is at least diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index c4f37192ee11..6803b542279e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -74,6 +74,7 @@ import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.realtime.ChatHandler; import org.apache.druid.segment.realtime.ChatHandlers; @@ -84,6 +85,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.BuildingShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionBoundaries; @@ -210,6 +212,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask private Long segmentsPublished; private final boolean isCompactionTask; + @Nullable + private Map deepStorageShuffleReports; + @JsonCreator public ParallelIndexSupervisorTask( @JsonProperty("id") String id, @@ -812,6 +817,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except ); return TaskStatus.failure(getId(), errMsg); } + deepStorageShuffleReports = indexingRunner.getReports(); indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); // 2. Partial segment merge phase @@ -920,6 +926,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep return TaskStatus.failure(getId(), errMsg); } + deepStorageShuffleReports = indexingRunner.getReports(); indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); // partition (interval, partitionId) -> partition locations @@ -957,6 +964,62 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep return taskStatus; } + /** + * Cleans up deep storage shuffle data produced during phase 1 of multi-phase parallel indexing. + *

+ * Cleanup is performed here in the supervisor task rather than in + * {@link org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager} because of + * the process model: phase-1 sub-tasks run as separate peon processes that exit before phase 2 + * starts. Each sub-task's DeepStorageIntermediaryDataManager instance is destroyed when the peon + * exits, so no surviving manager instance has knowledge of what files were pushed. The supervisor + * task is the only entity that is both alive after phase 2 completes and has the complete set of + * loadSpecs (collected from all sub-task reports). + *

+ * This method constructs minimal {@link DataSegment} objects from {@link DeepStoragePartitionStat} loadSpecs and + * delegates deletion to the appropriate storage-specific {@link DataSegmentKiller}. + * + * @param killer the segment killer from {@link TaskToolbox#getDataSegmentKiller()}. + * @param reports phase-1 sub-task reports containing partition stats with loadSpecs, + * may be null or empty if phase 1 produced no output. + */ + @VisibleForTesting + static void cleanupDeepStorageShuffleData( + DataSegmentKiller killer, + String datasource, + @Nullable Map reports + ) + { + if (reports == null || reports.isEmpty()) { + return; + } + + final List segmentsToKill = new ArrayList<>(); + for (final GeneratedPartitionsReport report : reports.values()) { + for (final PartitionStat stat : report.getPartitionStats()) { + if (stat instanceof DeepStoragePartitionStat deepStorageStat) { + segmentsToKill.add( + DataSegment.builder( + SegmentId.of(datasource, deepStorageStat.getInterval(), "shuffle_data", deepStorageStat.getBucketId()) + ).loadSpec(deepStorageStat.getLoadSpec()).build() + ); + } + } + } + + if (segmentsToKill.isEmpty()) { + return; + } + + LOG.info("Cleaning up [%d] deep storage shuffle files for datasource[%s].", segmentsToKill.size(), datasource); + try { + killer.kill(segmentsToKill); + } + catch (Exception e) { + // Cleanup failures must never cause the overall indexing task to fail. + LOG.warn(e, "Failed to clean up deep storage shuffle data files for datasource[%s]", datasource); + } + } + private static Map mergeCardinalityReports(Collection reports) { Map finalCollectors = new HashMap<>(); @@ -1835,6 +1898,8 @@ private Pair, Map> doGetRowStatsAndUnparseab @Override public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { + cleanupDeepStorageShuffleData(toolbox.getDataSegmentKiller(), getDataSource(), deepStorageShuffleReports); + if (!isCompactionTask) { super.cleanUp(toolbox, taskStatus); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java index 34199fc9e230..a6e71172c3ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java @@ -95,9 +95,21 @@ public Optional findPartitionFile(String supervisorTaskId, String su throw new UnsupportedOperationException("Not supported, get partition file using segment loadspec"); } + /** + * Not implemented for deep storage mode. Unlike {@link LocalIntermediaryDataManager}, + * which can walk the local filesystem to find and delete files by supervisorTaskId, + * this manager has no way to discover what files were pushed: it has no + * {@link org.apache.druid.segment.loading.DataSegmentKiller}, does not track pushed + * paths, and runs on short-lived peon processes whose state is lost on exit. + *

+ * Deep storage shuffle cleanup is handled by + * {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask#cleanupDeepStorageShuffleData}. + */ @Override public void deletePartitions(String supervisorTaskId) { - throw new UnsupportedOperationException("Not supported"); + throw new UnsupportedOperationException( + "Deep storage shuffle cleanup is handled by ParallelIndexSupervisorTask, not by the data manager" + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 4652ecc5444c..40f111c452e7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -45,7 +45,10 @@ import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.loading.DataSegmentKiller; +import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.DimensionRangeBucketShardSpec; import org.apache.druid.timeline.partition.HashPartitionFunction; @@ -551,4 +554,163 @@ private void verifyPartitionIdAndLocations( } } + public static class CleanupTest + { + private static final String TEST_DATASOURCE = "test-datasource"; + private static final Interval DAY1 = Intervals.of("2024-01-01/2024-01-02"); + private static final Interval DAY2 = Intervals.of("2024-01-02/2024-01-03"); + + @Test + public void testCleanupWithDeepStoragePartitionStats() throws Exception + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Arrays.asList( + createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1"), + createDeepStoragePartitionStat(DAY1, 1, "s3", "bucket-1", "key-2") + ), null)); + reports.put("subtask-2", new GeneratedPartitionsReport("subtask-2", List.of( + createDeepStoragePartitionStat(DAY2, 0, "s3", "bucket-1", "key-3") + ), null)); + + killer.kill(EasyMock.>anyObject()); + EasyMock.expectLastCall().andAnswer(() -> { + final List segments = (List) EasyMock.getCurrentArguments()[0]; + Assert.assertEquals(3, segments.size()); + final Set keys = segments.stream() + .map(s -> s.getLoadSpec().get("key")) + .collect(Collectors.toSet()); + Assert.assertEquals( + new HashSet<>(Arrays.asList("key-1", "key-2", "key-3")), + keys + ); + return null; + }); + EasyMock.replay(killer); + + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + + EasyMock.verify(killer); + } + + @Test + public void testCleanupWithLocalPartitionStats() + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + EasyMock.replay(killer); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Arrays.asList( + createLocalPartitionStat(DAY1, 0), + createLocalPartitionStat(DAY1, 1) + ), null)); + + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + + EasyMock.verify(killer); + } + + @Test + public void testCleanupWithMixedPartitionStats() throws Exception + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Arrays.asList( + createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-deep"), + createLocalPartitionStat(DAY1, 1) + ), null)); + + killer.kill(EasyMock.>anyObject()); + EasyMock.expectLastCall().andAnswer(() -> { + final List segments = (List) EasyMock.getCurrentArguments()[0]; + Assert.assertEquals(1, segments.size()); + Assert.assertEquals("key-deep", segments.get(0).getLoadSpec().get("key")); + return null; + }); + EasyMock.replay(killer); + + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + + EasyMock.verify(killer); + } + + @Test + public void testCleanupDoesNotFailWhenKillerThrows() throws Exception + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Collections.singletonList( + createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1") + ), null)); + + killer.kill(EasyMock.>anyObject()); + EasyMock.expectLastCall().andThrow(new SegmentLoadingException("Simulated failure")); + EasyMock.replay(killer); + + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + + EasyMock.verify(killer); + } + + @Test + public void testCleanupCalledEvenWhenPhase2Fails() throws Exception + { + final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class); + + final Map reports = new HashMap<>(); + reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Collections.singletonList( + createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1") + ), null)); + + killer.kill(EasyMock.>anyObject()); + EasyMock.expectLastCall(); + EasyMock.replay(killer); + + boolean phase2Failed; + try { + throw new RuntimeException("Simulated phase 2 failure"); + } + catch (RuntimeException e) { + phase2Failed = true; + } + finally { + ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports); + } + + Assert.assertTrue("Phase 2 should have failed", phase2Failed); + EasyMock.verify(killer); + } + + private static DeepStoragePartitionStat createDeepStoragePartitionStat( + Interval interval, + int bucketId, + String type, + String bucket, + String key + ) + { + return new DeepStoragePartitionStat( + interval, + new DimensionRangeBucketShardSpec(bucketId, List.of("dim1"), null, null), + ImmutableMap.of("type", type, "bucket", bucket, "key", key) + ); + } + + private static GenericPartitionStat createLocalPartitionStat(Interval interval, int bucketId) + { + return new GenericPartitionStat( + "host", + 8080, + false, + interval, + new DimensionRangeBucketShardSpec(bucketId, List.of("dim1"), null, null), + null, + null + ); + } + } + }