Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ Processing properties set on the Middle Manager are passed through to Peons.
|`druid.processing.numTimeoutThreads`|The number of processing threads to have available for handling per-segment query timeouts. Setting this value to `0` removes the ability to service per-segment timeouts, irrespective of `perSegmentTimeout` query context parameter. As these threads are just servicing timers, it's recommended to set this value to some small percent (e.g. 5%) of the total query processing cores available to the peon.|0|
|`druid.processing.fifo`|Enables the processing queue to treat tasks of equal priority in a FIFO manner.|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks. <br />Set to `local` to store segment files in the local storage of the Middle Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid does not support automated cleanup for the `shuffle-data` directory. You can set up cloud storage lifecycle rules for automated cleanup of data at the `shuffle-data` prefix location.|`local`|
|`druid.processing.intermediaryData.storage.type`|Storage type for intermediary segments of data shuffle between native parallel index tasks. <br />Set to `local` to store segment files in the local storage of the Middle Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for better fault tolerance during rolling updates. When the storage type is `deepstore`, Druid stores the data in the `shuffle-data` directory under the configured deep storage path. Druid automatically cleans up shuffle data from deep storage when the parallel indexing task completes.|`local`|
|`druid.processing.parallelPoolInit`|(EXPERIMENTAL) Allows all merge/processing memory pools to be allocated in parallel on process launch. This may significantly speed up Peon launch times if allocating several large buffers.|`false`|

The amount of direct memory needed by Druid is at least
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.ChatHandlers;
Expand All @@ -84,6 +85,7 @@
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.BuildingShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionBoundaries;
Expand Down Expand Up @@ -210,6 +212,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask
private Long segmentsPublished;
private final boolean isCompactionTask;

@Nullable
private Map<String, GeneratedPartitionsReport> deepStorageShuffleReports;

@JsonCreator
public ParallelIndexSupervisorTask(
@JsonProperty("id") String id,
Expand Down Expand Up @@ -812,6 +817,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
);
return TaskStatus.failure(getId(), errMsg);
}
deepStorageShuffleReports = indexingRunner.getReports();
indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);

// 2. Partial segment merge phase
Expand Down Expand Up @@ -920,6 +926,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep
return TaskStatus.failure(getId(), errMsg);
}

deepStorageShuffleReports = indexingRunner.getReports();
indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);

// partition (interval, partitionId) -> partition locations
Expand Down Expand Up @@ -957,6 +964,62 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep
return taskStatus;
}

/**
* Cleans up deep storage shuffle data produced during phase 1 of multi-phase parallel indexing.
* <p>
* Cleanup is performed here in the supervisor task rather than in
* {@link org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager} because of
* the process model: phase-1 sub-tasks run as separate peon processes that exit before phase 2
* starts. Each sub-task's DeepStorageIntermediaryDataManager instance is destroyed when the peon
* exits, so no surviving manager instance has knowledge of what files were pushed. The supervisor
* task is the only entity that is both alive after phase 2 completes and has the complete set of
* loadSpecs (collected from all sub-task reports).
* <p>
* This method constructs minimal {@link DataSegment} objects from {@link DeepStoragePartitionStat} loadSpecs and
* delegates deletion to the appropriate storage-specific {@link DataSegmentKiller}.
*
* @param killer the segment killer from {@link TaskToolbox#getDataSegmentKiller()}.
* @param reports phase-1 sub-task reports containing partition stats with loadSpecs,
* may be null or empty if phase 1 produced no output.
*/
@VisibleForTesting
static void cleanupDeepStorageShuffleData(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the dir deleted?

DataSegmentKiller killer,
String datasource,
@Nullable Map<String, GeneratedPartitionsReport> reports
)
{
if (reports == null || reports.isEmpty()) {
return;
}

final List<DataSegment> segmentsToKill = new ArrayList<>();
for (final GeneratedPartitionsReport report : reports.values()) {
for (final PartitionStat stat : report.getPartitionStats()) {
if (stat instanceof DeepStoragePartitionStat deepStorageStat) {
segmentsToKill.add(
DataSegment.builder(
SegmentId.of(datasource, deepStorageStat.getInterval(), "shuffle_data", deepStorageStat.getBucketId())
).loadSpec(deepStorageStat.getLoadSpec()).build()
);
}
}
}

if (segmentsToKill.isEmpty()) {
return;
}

LOG.info("Cleaning up [%d] deep storage shuffle files for datasource[%s].", segmentsToKill.size(), datasource);
try {
killer.kill(segmentsToKill);
}
catch (Exception e) {
// Cleanup failures must never cause the overall indexing task to fail.
LOG.warn(e, "Failed to clean up deep storage shuffle data files for datasource[%s]", datasource);
}
}

private static Map<Interval, Union> mergeCardinalityReports(Collection<DimensionCardinalityReport> reports)
{
Map<Interval, Union> finalCollectors = new HashMap<>();
Expand Down Expand Up @@ -1835,6 +1898,8 @@ private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseab
@Override
public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception
{
cleanupDeepStorageShuffleData(toolbox.getDataSegmentKiller(), getDataSource(), deepStorageShuffleReports);

if (!isCompactionTask) {
super.cleanUp(toolbox, taskStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,21 @@ public Optional<ByteSource> findPartitionFile(String supervisorTaskId, String su
throw new UnsupportedOperationException("Not supported, get partition file using segment loadspec");
}

/**
* Not implemented for deep storage mode. Unlike {@link LocalIntermediaryDataManager},
* which can walk the local filesystem to find and delete files by supervisorTaskId,
* this manager has no way to discover what files were pushed: it has no
* {@link org.apache.druid.segment.loading.DataSegmentKiller}, does not track pushed
* paths, and runs on short-lived peon processes whose state is lost on exit.
* <p>
* Deep storage shuffle cleanup is handled by
* {@link org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask#cleanupDeepStorageShuffleData}.
*/
@Override
public void deletePartitions(String supervisorTaskId)
{
throw new UnsupportedOperationException("Not supported");
throw new UnsupportedOperationException(
"Deep storage shuffle cleanup is handled by ParallelIndexSupervisorTask, not by the data manager"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.DimensionRangeBucketShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
Expand Down Expand Up @@ -551,4 +554,163 @@ private void verifyPartitionIdAndLocations(
}
}

public static class CleanupTest
{
private static final String TEST_DATASOURCE = "test-datasource";
private static final Interval DAY1 = Intervals.of("2024-01-01/2024-01-02");
private static final Interval DAY2 = Intervals.of("2024-01-02/2024-01-03");

@Test
public void testCleanupWithDeepStoragePartitionStats() throws Exception
{
final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class);

final Map<String, GeneratedPartitionsReport> reports = new HashMap<>();
reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Arrays.asList(
createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1"),
createDeepStoragePartitionStat(DAY1, 1, "s3", "bucket-1", "key-2")
), null));
reports.put("subtask-2", new GeneratedPartitionsReport("subtask-2", List.of(
createDeepStoragePartitionStat(DAY2, 0, "s3", "bucket-1", "key-3")
), null));

killer.kill(EasyMock.<List<DataSegment>>anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
final List<DataSegment> segments = (List<DataSegment>) EasyMock.getCurrentArguments()[0];
Assert.assertEquals(3, segments.size());
final Set<Object> keys = segments.stream()
.map(s -> s.getLoadSpec().get("key"))
.collect(Collectors.toSet());
Assert.assertEquals(
new HashSet<>(Arrays.asList("key-1", "key-2", "key-3")),
keys
);
return null;
});
EasyMock.replay(killer);

ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports);

EasyMock.verify(killer);
}

@Test
public void testCleanupWithLocalPartitionStats()
{
final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class);
EasyMock.replay(killer);

final Map<String, GeneratedPartitionsReport> reports = new HashMap<>();
reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Arrays.asList(
createLocalPartitionStat(DAY1, 0),
createLocalPartitionStat(DAY1, 1)
), null));

ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports);

EasyMock.verify(killer);
}

@Test
public void testCleanupWithMixedPartitionStats() throws Exception
{
final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class);

final Map<String, GeneratedPartitionsReport> reports = new HashMap<>();
reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Arrays.asList(
createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-deep"),
createLocalPartitionStat(DAY1, 1)
), null));

killer.kill(EasyMock.<List<DataSegment>>anyObject());
EasyMock.expectLastCall().andAnswer(() -> {
final List<DataSegment> segments = (List<DataSegment>) EasyMock.getCurrentArguments()[0];
Assert.assertEquals(1, segments.size());
Assert.assertEquals("key-deep", segments.get(0).getLoadSpec().get("key"));
return null;
});
EasyMock.replay(killer);

ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports);

EasyMock.verify(killer);
}

@Test
public void testCleanupDoesNotFailWhenKillerThrows() throws Exception
{
final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class);

final Map<String, GeneratedPartitionsReport> reports = new HashMap<>();
reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Collections.singletonList(
createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1")
), null));

killer.kill(EasyMock.<List<DataSegment>>anyObject());
EasyMock.expectLastCall().andThrow(new SegmentLoadingException("Simulated failure"));
EasyMock.replay(killer);

ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports);

EasyMock.verify(killer);
}

@Test
public void testCleanupCalledEvenWhenPhase2Fails() throws Exception
{
final DataSegmentKiller killer = EasyMock.createStrictMock(DataSegmentKiller.class);

final Map<String, GeneratedPartitionsReport> reports = new HashMap<>();
reports.put("subtask-1", new GeneratedPartitionsReport("subtask-1", Collections.singletonList(
createDeepStoragePartitionStat(DAY1, 0, "s3", "bucket-1", "key-1")
), null));

killer.kill(EasyMock.<List<DataSegment>>anyObject());
EasyMock.expectLastCall();
EasyMock.replay(killer);

boolean phase2Failed;
try {
throw new RuntimeException("Simulated phase 2 failure");
}
catch (RuntimeException e) {
phase2Failed = true;
}
finally {
ParallelIndexSupervisorTask.cleanupDeepStorageShuffleData(killer, TEST_DATASOURCE, reports);
}

Assert.assertTrue("Phase 2 should have failed", phase2Failed);
EasyMock.verify(killer);
}

private static DeepStoragePartitionStat createDeepStoragePartitionStat(
Interval interval,
int bucketId,
String type,
String bucket,
String key
)
{
return new DeepStoragePartitionStat(
interval,
new DimensionRangeBucketShardSpec(bucketId, List.of("dim1"), null, null),
ImmutableMap.of("type", type, "bucket", bucket, "key", key)
);
}

private static GenericPartitionStat createLocalPartitionStat(Interval interval, int bucketId)
{
return new GenericPartitionStat(
"host",
8080,
false,
interval,
new DimensionRangeBucketShardSpec(bucketId, List.of("dim1"), null, null),
null,
null
);
}
}

}
Loading