Conversation
CompactionTaskTest TaskLockHelperTest
...xing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
Fixed
Show fixed
Hide fixed
|
It looks like this and #18996 are aiming at similar goals but are taking different approaches. A big one is that #18996 only works with MSQ compaction and this one only works with non-MSQ compaction tasks. I am wondering if they can coexist. re: this piece,
#18996 deals with the replace issue by using the "upgrade" system that was introduced for concurrent replace (system from #14407, #15039, #15684). The segments that are not being compacted are carried through without modification ("upgraded"). It deals with the MultipleIntervalSegmentSpec issue by using a new feature in |
|
Thanks for pointing this out @gianm, I see that #18996 happen to fix compaction on the MSQ side, and that's pretty neat! I do not have much experience with MSQ, given that we are still using Druid v27 (yea, its old... but we are upgrading soon). In our production servers, we used this PR by making a script to select segments, and issue minor compaction specs. There are still further plans to incorporate segment selection with automatic compaction. Would like to ask what is the direction for handling specific segments? I see that there are some discussions about |
|
Hi @GWphua , took a glance at your pr, several questions:
|
|
Thanks for taking a look @cecemei Minor compaction depends on SEGMENT lock introduced in #7547 (See #7491 also for more info). The use of segment lock assumes that the number of segments in the same datasource and same interval will not exceed Let's say there are 100 segments in the same interval. we specify segments 0~4 in the SpecificSegmentsSpec to be compacted: A new segment The rest of the segments (5-100) will still stay queryable + available because they belong in the same MAJOR version. The segments 0~4 will then be scheduled to be killed. Let's say that we do a major compaction right after (on the partitions {5, 6, .., 99, 100, 32768} ). IIUC, all segments in the same interval will be upgraded to a new MAJOR version. If the major compaction results in N segments created, we will see partitionIds {0, 1, ..., N-2, N-1}, all of them in the newer MAJOR version. |
|
@GWphua , as @cecemei points out, there are some concerns with this implementation:
I would advise revising the implementation in this PR to make use of the REPLACE/APPEND (i.e. concurrent supported locks), and the upgrade mechanism similar to what is being done in #19059 (revision of #18996). In short, the approach would be something like this:
|
|
Thanks for the detailed rundown of the motivation behind removing segment locks, I will work on this after #19059 is merged. I have some concerns regarding the new mechanism. Not sure if some of the things I enjoyed when using segmentLock will be preserved, answering these questions which will greatly help me understand things faster:
|
|
Thanks, @GWphua ! In #19059 , compaction tasks will use Supporting multiple concurrent Meanwhile, with the support of minor compaction coupled with compaction supervisors (which are much more reactive in scheduling compaction jobs as compared to the old-style compaction duty on the Coordinator), I feel that compaction would be fast enough that we would not need to launch multiple jobs for the same interval together anyway. Please let me know if that answers your question. |
|
Thanks @kfaraz! Love the clarification about locks! I know that segment lock is kinda a can of worms, and since we are shifting to using this new lock mechanism which sounds more polished, I'm down to make changes to my current implementation. As for compaction being fast, let me share a bit about the use case: We have not been actively compacting our segments, and are recently trying to apply compaction across datasources that are using Kafka ingestion. These datasources have months worth of data, and have 10k+ segments per segmentGranularity (hour). A single major compaction takes 8~10 hours for a single time chunk. We thought of working with this strategy: Compacting the newer data, and let the older data expire. This strategy does not solve the issue when compaction is slower than ingestion (and also does not work for clusters using loadForever). By implementing Concurrent Minor Compactions, we can parallelize the workload. Each minor compaction takes 5min. The shorter time, coupled with multiple minor tasks submitted simultaneously (sometimes for same time chunk), reduces what would take an estimated 24 months of compaction time down to 18 days. Providing concurrent minor compaction will help users that want the segments to be compacted urgently.
The main concern when coming to compaction is whether the speed of compaction is able to keep up with our ingestion load. We have compacted all of our historical data already, and do not really have a high compaction speed requirement anymore. I would love to test the MSQ implementation of minor compaction, and if results are satisfactory then this can take a back-seat. 👍 |
Sounds good, @GWphua ! Thanks for providing the context. FYI, #19059 has been merged. Please feel free to try it out. |
|
Hi @kfaraz I have tested out MSQ minor + major compaction. We are able to have multiple workers running on the same interval, and this allows us to speed up the compaction process. I am very happy with this outcome.
For a single interval, major compaction tasks timed out after running >8h, and minor compaction is able to accomplish it within 10min. |
|
That is great news, @GWphua !
Wow, those are great time savings!
By this, do you mean launch a single minor compaction job with a large number of MSQ workers? |
|
4 task slots are reserved for each case. Each task slot is provided with the same cpu/mem resources. For the minor case, there is an additional requirement to specify the segment Id to be retrieved for compaction. On my side, I tweaked my minor compaction script to first search compactible segments, then submit these segments for compaction. |
|
Ran native minor compaction tasks on a test cluster:
|
@GWphua , wouldn't native minor compaction work with |
kfaraz
left a comment
There was a problem hiding this comment.
Overall looks good, left some minor queries/suggestions.
Once these are resolved, we should be good to go.
| |-----|-----------|-------|--------| | ||
| |`type`|Task type. Set the value to `compact`.|none|Yes| | ||
| |`inputSpec`|Specification of the target [interval](#interval-inputspec) or [segments](#segments-inputspec).|none|Yes| | ||
| |`inputSpec`|Specification of the target [interval](#interval-inputspec) or [uncompacted](#uncompacted-inputspec).|none|Yes| |
There was a problem hiding this comment.
I wonder if we should change the type name from uncompacted to minor.
Since this feature has not been released yet, I think we still have time to fix it up.
If we do this, the field name can be changed from uncompactedSegments to segmentsToCompact.
uncompacted is slightly incorrect because there may be segments that were appended (by a concurrent APPEND job) after the minor compaction task started. The minor compaction task would not compact these segments (since they are not present in uncompactedSegments passed to the spec) and would simply be upgraded instead.
There was a problem hiding this comment.
Agree.
I was thinking about minor inputSpec, and changing the field name to segments for brevity, especially since we know minor compaction is for targeting specific segments. lmk if its ok!
|
|
||
| ### Segment `inputSpec` | ||
|
|
||
| The segment `inputSpec` is deprecated, instructions for usage will no longer be documented. Please use the above 2 `inputSpec` instead. |
There was a problem hiding this comment.
| The segment `inputSpec` is deprecated, instructions for usage will no longer be documented. Please use the above 2 `inputSpec` instead. | |
| The segment `inputSpec` is deprecated, instructions for usage will no longer be documented. Please use the types `interval` or `uncompacted` instead. |
indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java
Outdated
Show resolved
Hide resolved
indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskLockHelperTest.java
Outdated
Show resolved
Hide resolved
...xing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
Outdated
Show resolved
Hide resolved
...xing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
Outdated
Show resolved
Hide resolved
| // Native minor compaction uses REPLACE ingestion mode, which uses time chunk lock. | ||
| if (compactionTask.getIoConfig().getInputSpec() instanceof MinorCompactionInputSpec) { | ||
| newContext.put(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); | ||
| newContext.put(Tasks.USE_CONCURRENT_LOCKS, true); |
There was a problem hiding this comment.
Why should we force useConcurrentLocks here?
If useConcurrentLocks is a requirement for minor compaction, I think we should validate that in the CompactionTask constructor instead so that users are aware of the requirement.
Also, I wonder if there is any real benefit to forcing these context parameters here since IIUC, the locks would have already been acquired at this point.
There was a problem hiding this comment.
locks would have already been acquired at this point.
True, Compaction Tasks without useConcurrentLocks set to true will cause this error:
org.apache.druid.java.util.common.IOE: Error with status[500 Server Error] and message[{"error":"org.apache.druid.error.DruidException: Segments to upgrade must be covered by a REPLACE lock. Only [0] out of [454] segments are covered."}]. Check overlord logs for details.
I will use a validator to return a better error message.
|
|
||
| final MinorCompactionInputSpec minorSpec = new MinorCompactionInputSpec( | ||
| testInterval, | ||
| ImmutableList.of(segment6.toDescriptor(), segment7.toDescriptor(), segment8.toDescriptor()) |
There was a problem hiding this comment.
Nit: Use List.of(), Map.of() and Set.of() for brevity.
There was a problem hiding this comment.
Sure, it will also be nice to include a standard for using this over other APIs. (Thought i forsee a large amount of changes that may come with it 🥴)
| final TestTaskActionClient taskActionClient = new TestTaskActionClient(allSegmentsInInterval); | ||
|
|
||
| // Verify findSegmentsToLock() returns ALL segments in interval (no filtering) | ||
| final List<DataSegment> segmentsToLock = compactionTask.findSegmentsToLock( |
There was a problem hiding this comment.
IIUC, the findSegmentsToLock() method is invoked only when we go down the segment locking code flow.
Ideally, CompactionTask.isReady() would call determineLockGranularityAndTryLock() which would short-circuit to return a TIME_CHUNK lock and we would never go down the path that involves methods determineSegmentGranularity() or findSegmentsToLock().
If needed, we can perform validatations in CompactionTask constructor to ensure that minor compaction is used only with ingestionMode == REPLACE. We need not support REPLACE_LEGACY (which uses isDropExisting: false) since it is a legacy mode and has been deprecated for a while. I don't think that mode makes sense in the context of compaction either.
Let me know what you think.
There was a problem hiding this comment.
Since it is a legacy mode, there's no need to support it anymore. I have added validation and appropriate tests.
|
Hi @kfaraz Thanks for the suggestions. The main changes here is: type name from Do take a look again. Thanks! |

Fixes #9712
Fixes #9768
Motivation
Submitting a compaction task with
SpecificSegmentsSpec(segment IDs) would cause Druid to lock, read, and rewrite all segments in the umbrella interval, defeating the purpose of targeting specific segments.This results in very long compaction tasks, as the entire interval's segments are being considered for compaction. After the changes are being introduced, we are able to select multiple small segments to compact instead of processing all segments in the interval. This reduces the time taken for compaction from ~3h to ~5min.
Description
This PR introduces minor compaction to the native engine. Users are expected to submit a MinorCompactionInputSpec targeting only the specified segments.
Segments in the same interval that are not in the spec are upgraded in-place via
MarkSegmentToUpgradeAction(metadata-only version bump, nophysical rewrite).
The bulk of this PR is built upon the changes made in #19059
Also, removing segment locking in the future will close all related issues, such as #9571, #10911 (there should be many more issues I'm not listing here)
Release note
You can now submit native-engine minor compaction tasks using MinorCompactionInputSpec.
Submitting compaction tasks using the 'segments'
inputSpecis now deprecated. Use the 'uncompacted'inputSpecinstead.Key changed/added classes in this PR
CompactionTaskNativeCompactionRunnerIndexTaskParallelIndexSupervisorTaskParallelIndexSupervisorTaskSinglePhaseSubTaskMSQCompactionRunnerThis PR has: