Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.CompactionCandidateAndStatus;
import org.apache.druid.server.compaction.CompactionSlotManager;
import org.apache.druid.server.compaction.DataSourceCompactibleSegmentIterator;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
Expand Down Expand Up @@ -90,7 +90,7 @@ public List<CompactionJob> createCompactionJobs(

// Create a job for each CompactionCandidate
while (segmentIterator.hasNext()) {
final CompactionCandidate candidate = segmentIterator.next();
final CompactionCandidateAndStatus candidate = segmentIterator.next();

// Allow template-specific customization of the config per candidate
DataSourceCompactionConfig finalConfig = configOptimizer.optimizeConfig(config, candidate, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.indexing.template.BatchIndexingJob;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.CompactionCandidateAndStatus;
import org.apache.druid.timeline.CompactionState;

/**
* {@link BatchIndexingJob} to compact an interval of a datasource.
*/
public class CompactionJob extends BatchIndexingJob
{
private final CompactionCandidate candidate;
private final CompactionCandidateAndStatus candidate;
private final int maxRequiredTaskSlots;
private final String targetIndexingStateFingerprint;
private final CompactionState targetIndexingState;

public CompactionJob(
ClientCompactionTaskQuery task,
CompactionCandidate candidate,
CompactionCandidateAndStatus candidate,
int maxRequiredTaskSlots,
String targetIndexingStateFingerprint,
CompactionState targetIndexingState
Expand All @@ -52,7 +52,7 @@ public CompactionJob(

public CompactionJob(
ClientSqlQuery msqQuery,
CompactionCandidate candidate,
CompactionCandidateAndStatus candidate,
int maxRequiredTaskSlots,
String targetIndexingStateFingerprint,
CompactionState targetIndexingState
Expand All @@ -70,7 +70,7 @@ public String getDataSource()
return candidate.getDataSource();
}

public CompactionCandidate getCandidate()
public CompactionCandidateAndStatus getCandidate()
{
return candidate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
Expand All @@ -39,11 +40,10 @@
import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper;
import org.apache.druid.segment.metadata.IndexingStateCache;
import org.apache.druid.segment.metadata.IndexingStateStorage;
import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.CompactionCandidateAndStatus;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.CompactionSlotManager;
import org.apache.druid.server.compaction.CompactionSnapshotBuilder;
import org.apache.druid.server.compaction.CompactionStatus;
import org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
Expand Down Expand Up @@ -161,7 +161,7 @@
if (supervisor.shouldCreateJobs() && !activeSupervisors.contains(supervisorId)) {
// Queue fresh jobs
final List<CompactionJob> jobs = supervisor.createJobs(source, jobParams);
jobs.forEach(job -> snapshotBuilder.addToPending(job.getCandidate()));
jobs.forEach(job -> snapshotBuilder.addToPending(job.getCandidate().getCandidate()));

queue.addAll(jobs);
activeSupervisors.add(supervisorId);
Expand Down Expand Up @@ -242,7 +242,7 @@
// Add this job back to the queue
queue.add(job);
} else {
snapshotBuilder.moveFromPendingToCompleted(job.getCandidate());
snapshotBuilder.moveFromPendingToCompleted(job.getCandidate().getCandidate());
}
}

Expand Down Expand Up @@ -273,27 +273,32 @@
)
{
// Check if the job is a valid compaction job
final CompactionCandidate candidate = job.getCandidate();
final CompactionCandidateAndStatus candidate = job.getCandidate();
final CompactionConfigValidationResult validationResult = validateCompactionJob(job);
if (!validationResult.isValid()) {
log.error("Skipping invalid compaction job[%s] due to reason[%s].", job, validationResult.getReason());
snapshotBuilder.moveFromPendingToSkipped(candidate);
snapshotBuilder.moveFromPendingToSkipped(candidate.getCandidate());
return false;
}

// Check if the job is already running, completed or skipped
final CompactionStatus compactionStatus = statusTracker.computeCompactionStatus(job.getCandidate(), policy);
switch (compactionStatus.getState()) {
case RUNNING:
return false;
case COMPLETE:
snapshotBuilder.moveFromPendingToCompleted(candidate);
return false;
case SKIPPED:
snapshotBuilder.moveFromPendingToSkipped(candidate);
return false;
default:
break;
CompactionCandidateSearchPolicy.Eligibility eligibility = policy.checkEligibilityForCompaction(candidate);
if (!eligibility.isEligible()) {
statusTracker.onSkippedCandidate(candidate, eligibility.getReason());
return false;
}
// Check if the job is already running or succeeded
final TaskState candidateState = statusTracker.computeCompactionTaskState(job.getCandidate().getCandidate());

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
CompactionStatusTracker.computeCompactionTaskState
should be avoided because it has been deprecated.
if (candidateState != null) {
switch (candidateState) {
case RUNNING:
return false;
case SUCCESS:
snapshotBuilder.moveFromPendingToCompleted(candidate.getCandidate());
return false;
case FAILED:
default:
throw DruidException.defensive("unexpected compaction candidate state[%s]", candidateState);
}
}

// Check if enough compaction task slots are available
Expand All @@ -307,7 +312,7 @@
final String taskId = startTaskIfReady(job);
if (taskId == null) {
// Mark the job as skipped for now as the intervals might be locked by other tasks
snapshotBuilder.moveFromPendingToSkipped(candidate);
snapshotBuilder.moveFromPendingToSkipped(candidate.getCandidate());
return false;
} else {
statusTracker.onTaskSubmitted(taskId, job.getCandidate());
Expand Down Expand Up @@ -341,7 +346,7 @@

log.debug(
"Checking readiness of task[%s] with interval[%s]",
task.getId(), job.getCandidate().getCompactionInterval()
task.getId(), job.getCandidate().getCandidate().getCompactionInterval()
);
try {
taskLockbox.add(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon
updateRequest.getEngine()
);
} else {
return new CompactionSimulateResult(Collections.emptyMap());
return new CompactionSimulateResult(Collections.emptyMap(), null, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.compact;

import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.CompactionCandidateAndStatus;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;

/**
Expand All @@ -43,7 +44,7 @@ public interface ReindexingConfigOptimizer
*/
DataSourceCompactionConfig optimizeConfig(
DataSourceCompactionConfig config,
CompactionCandidate candidate,
CompactionCandidateAndStatus candidate,
CompactionJobParams params
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.server.compaction.CompactionCandidate;
import org.apache.druid.server.compaction.CompactionCandidateAndStatus;
import org.apache.druid.server.compaction.CompactionStatus;
import org.apache.druid.server.compaction.ReindexingDeletionRule;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
Expand Down Expand Up @@ -59,7 +60,7 @@ public class ReindexingDeletionRuleOptimizer implements ReindexingConfigOptimize
@Override
public DataSourceCompactionConfig optimizeConfig(
DataSourceCompactionConfig config,
CompactionCandidate candidate,
CompactionCandidateAndStatus candidate,
CompactionJobParams params
)
{
Expand Down Expand Up @@ -96,13 +97,13 @@ public DataSourceCompactionConfig optimizeConfig(
* </p>
*
* @param candidateSegments the {@link CompactionCandidate}
* @param expectedFilter the expected filter (as a NotDimFilter wrapping an OrDimFilter)
* @param expectedFilter the expected filter (as a NotDimFilter wrapping an OrDimFilter)
* @param fingerprintMapper the fingerprint mapper to retrieve applied rules from segment fingerprints
* @return the set of unapplied deletion rules wrapped in a NotDimFilter, or null if all rules have been applied
*/
@Nullable
private NotDimFilter computeRequiredSetOfFilterRulesForCandidate(
CompactionCandidate candidateSegments,
CompactionCandidateAndStatus candidateSegments,
NotDimFilter expectedFilter,
IndexingStateFingerprintMapper fingerprintMapper
)
Expand All @@ -114,7 +115,7 @@ private NotDimFilter computeRequiredSetOfFilterRulesForCandidate(
expectedFilters = ((OrDimFilter) expectedFilter.getField()).getFields();
}

Set<String> uniqueFingerprints = candidateSegments.getSegments().stream()
Set<String> uniqueFingerprints = candidateSegments.getCandidate().getSegments().stream()
.map(DataSegment::getIndexingStateFingerprint)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -229,15 +230,15 @@ private static Set<DimFilter> extractAppliedFilters(CompactionState state)
* Returns true only if the candidate has been compacted before and has a NotDimFilter.
*/
private boolean shouldOptimizeFilterRules(
CompactionCandidate candidate,
CompactionCandidateAndStatus candidate,
DataSourceCompactionConfig config
)
{
if (candidate.getCurrentStatus() == null) {
if (CompactionStatus.State.NOT_ELIGIBLE.equals(candidate.getStatus().getState())) {
return false;
}

if (candidate.getCurrentStatus().getReason().equals(CompactionStatus.NEVER_COMPACTED_REASON)) {
if (CompactionStatus.State.ELIGIBLE.equals(candidate.getStatus().getState())
&& CompactionStatus.NEVER_COMPACTED_REASON.equals(candidate.getStatus().getReason())) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.apache.druid.segment.metadata.IndexingStateCache;
import org.apache.druid.server.compaction.CompactionSimulateResult;
import org.apache.druid.server.compaction.CompactionStatistics;
import org.apache.druid.server.compaction.CompactionStatus;
import org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.compaction.Table;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
Expand Down Expand Up @@ -450,8 +449,8 @@ public void test_simulateRunWithConfigUpdate()
final CompactionSimulateResult simulateResult = scheduler.simulateRunWithConfigUpdate(
new ClusterCompactionConfig(null, null, null, null, null, null)
);
Assert.assertEquals(1, simulateResult.getCompactionStates().size());
final Table pendingCompactionTable = simulateResult.getCompactionStates().get(CompactionStatus.State.PENDING);
Assert.assertEquals(0, simulateResult.getCompactionStates().size());
final Table pendingCompactionTable = simulateResult.getQueuedIntervals();
Assert.assertEquals(
Arrays.asList("dataSource", "interval", "numSegments", "bytes", "maxTaskSlots", "reasonToCompact"),
pendingCompactionTable.getColumnNames()
Expand Down
Loading
Loading