From 8e13fb871542b2a55b32d4693f635befd5ae7f03 Mon Sep 17 00:00:00 2001 From: cecemei Date: Wed, 11 Mar 2026 10:13:19 -0700 Subject: [PATCH 1/8] Revert "fix native compaction oom issue (#19121)" This reverts commit ad1b5f28bfecfd7b3cbd7f86d0d03c0af61df9b8. --- .../indexing/common/task/CompactionTask.java | 2 +- .../task/CompactionTaskParallelRunTest.java | 2 +- .../common/task/CompactionTaskRunBase.java | 117 ++++-------------- .../task/NativeCompactionTaskRunTest.java | 2 +- .../msq/exec/MSQCompactionTaskRunTest.java | 2 +- 5 files changed, 26 insertions(+), 99 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index d7a224b05858..5c431648169b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -636,7 +636,7 @@ static Map createDataSchemasForIntervals( toolbox.getEmitter(), metricBuilder, segmentProvider.dataSource, - JodaUtils.umbrellaInterval(Iterables.transform(timelineSegments, DataSegment::getInterval)), + umbrellaInterval(timelineSegments, segmentProvider), lazyFetchSegments( timelineSegments, toolbox.getSegmentCacheManager() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 6b12ad84461c..75d046593883 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -372,7 +372,7 @@ public void testRunParallelWithRangePartitioningAndNoUpfrontSegmentFetching() th Granularities.HOUR, Granularities.MINUTE, true, - ImmutableList.of(Intervals.of("2014-01-01T00:00:00Z/2014-01-01T03:00:00Z")) + ImmutableList.of(INTERVAL_TO_INDEX) ), null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java index b8f14ae1a3de..50b53e614886 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java @@ -170,7 +170,6 @@ public abstract class CompactionTaskRunBase protected static final Interval TEST_INTERVAL_DAY = Intervals.of("2014-01-01/2014-01-02"); protected static final Interval TEST_INTERVAL = Intervals.of("2014-01-01T00:00:00Z/2014-01-01T06:00:00Z"); - protected static final Interval TEST_ACTUAL_INTERVAL = Intervals.of("2014-01-01T00:00:00Z/2014-01-01T03:00:00Z"); protected static final List TEST_ROWS = ImmutableList.of( "2014-01-01T00:00:10Z,a,1\n", "2014-01-01T00:00:10Z,b,2\n", @@ -305,13 +304,7 @@ public void testRunWithDynamicPartitioning() throws Exception final List segments = new ArrayList<>(dataSegmentsWithSchemas.getSegments()); List rowsFromSegment = getCSVFormatRowsFromSegments(segments); Assert.assertEquals(TEST_ROWS, rowsFromSegment); - verifyCompactedSegment( - compactionTask.getCompactionRunner(), - segments, - segmentGranularity, - DEFAULT_QUERY_GRAN, - false - ); + verifyCompactedSegment(segments, segmentGranularity, DEFAULT_QUERY_GRAN, false); } @Test @@ -371,44 +364,14 @@ public void testRunCompactionTwice() throws Exception final Pair resultPair1 = runTask(compactionTask1); verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS); - verifyCompactedSegment( - compactionTask1.getCompactionRunner(), - List.copyOf(resultPair1.rhs.getSegments()), - segmentGranularity, - DEFAULT_QUERY_GRAN, - false - ); + verifyCompactedSegment(List.copyOf(resultPair1.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, false); final CompactionTask compactionTask2 = compactionTaskBuilder(segmentGranularity).interval(inputInterval, true).build(); final Pair resultPair2 = runTask(compactionTask2); verifyTaskSuccessRowsAndSchemaMatch(resultPair2, TOTAL_TEST_ROWS); - if (segmentGranularity == null || segmentGranularity.equals(Granularities.HOUR)) { - verifyCompactedSegment( - compactionTask1.getCompactionRunner(), - List.copyOf(resultPair2.rhs.getSegments()), - segmentGranularity, - DEFAULT_QUERY_GRAN, - false - ); - } else if (segmentGranularity.equals(Granularities.SIX_HOUR)) { - Set compactedSegments = resultPair2.rhs.getSegments(); - Assert.assertEquals(1, compactedSegments.size()); - - DataSegment compactedSegment = Iterables.getOnlyElement(compactedSegments); - Assert.assertEquals(TEST_INTERVAL, compactedSegment.getInterval()); - - // compact interval is always the SIX_HOUR interval, since the previous compaction has generated a new SIX_HOUR segment, this means the compaction state in the second compaction is different from the first one. - Assert.assertEquals( - getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN, List.of(TEST_INTERVAL)), - compactedSegment.getLastCompactionState() - ); - Assert.assertEquals(new NumberedShardSpec(0, 1), compactedSegment.getShardSpec()); - - } else { - throw new RE("Gran[%s] is not supported", segmentGranularity); - } + verifyCompactedSegment(List.copyOf(resultPair2.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, false); } @Test @@ -422,13 +385,7 @@ public void testRunCompactionTwiceWithSegmentLock() throws Exception final Pair resultPair1 = runTask(compactionTask1); verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS); - verifyCompactedSegment( - compactionTask1.getCompactionRunner(), - List.copyOf(resultPair1.rhs.getSegments()), - segmentGranularity, - DEFAULT_QUERY_GRAN, - true - ); + verifyCompactedSegment(List.copyOf(resultPair1.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, true); final CompactionTask compactionTask2 = compactionTaskBuilder(segmentGranularity).interval(inputInterval, false).build(); @@ -442,9 +399,9 @@ public void testRunCompactionTwiceWithSegmentLock() throws Exception for (int i = 0; i < 3; i++) { Interval interval = Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1); Assert.assertEquals(interval, segments.get(i).getInterval()); - Interval compactInterval = segmentGranularity == null ? interval : TEST_ACTUAL_INTERVAL; + Interval inputInterval = segmentGranularity == null ? interval : this.inputInterval; Assert.assertEquals( - getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, DEFAULT_QUERY_GRAN, List.of(compactInterval)), + getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, DEFAULT_QUERY_GRAN, List.of(inputInterval)), segments.get(i).getLastCompactionState() ); // overwrite shard starts at NON_ROOT_GEN_START_PARTITION_ID + 1, and minor version 2 for the second compaction @@ -459,9 +416,8 @@ public void testRunCompactionTwiceWithSegmentLock() throws Exception } else if (segmentGranularity.equals(Granularities.SIX_HOUR)) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval()); - // compact interval is always the SIX_HOUR interval, since the previous compaction has generated a new SIX_HOUR segment, this means the compaction state in the second compaction is different from the first one. Assert.assertEquals( - getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN, List.of(TEST_INTERVAL)), + getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN, List.of(inputInterval)), segments.get(0).getLastCompactionState() ); // use overwrite shard for the second compaction @@ -480,11 +436,7 @@ public void testRunCompactionTwiceWithSegmentLock() throws Exception @Test public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exception { - Assume.assumeTrue( - "test with defined segment granularity and interval in this test", - Granularities.SIX_HOUR.equals(segmentGranularity) && TEST_INTERVAL.equals(inputInterval) - && lockGranularity != LockGranularity.SEGMENT - ); + Assume.assumeTrue("Use 3 hr interval to compact", TEST_INTERVAL.equals(inputInterval)); verifyTaskSuccessRowsAndSchemaMatch(runIndexTask(), TOTAL_TEST_ROWS); final CompactionTask compactionTask = @@ -535,7 +487,6 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc Pair compactionResult = compactionFuture.get(); verifyTaskSuccessRowsAndSchemaMatch(compactionResult, TOTAL_TEST_ROWS); verifyCompactedSegment( - compactionTask.getCompactionRunner(), List.copyOf(compactionResult.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, @@ -588,7 +539,11 @@ public void testWithSegmentGranularityMisalignedIntervalAllowed() throws Excepti Assert.assertEquals(Intervals.of("2013-12-30/2014-01-06"), segments.get(0).getInterval()); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); Assert.assertEquals( - getDefaultCompactionState(Granularities.WEEK, DEFAULT_QUERY_GRAN, List.of(TEST_ACTUAL_INTERVAL)), + getDefaultCompactionState( + Granularities.WEEK, + DEFAULT_QUERY_GRAN, + List.of(inputInterval) + ), segments.get(0).getLastCompactionState() ); } @@ -649,14 +604,10 @@ public void testCompactionWithFilterInTransformSpec() throws Exception Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval()); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); - // native runner use interval from actual seen segments, while msq runner use interval from input - Interval compactInterval = compactionTask.getCompactionRunner() instanceof NativeCompactionRunner - ? TEST_ACTUAL_INTERVAL - : inputInterval; CompactionState expectedCompactionState = getDefaultCompactionState( segmentGranularity, DEFAULT_QUERY_GRAN, - List.of(compactInterval) + List.of(inputInterval) ).toBuilder().transformSpec(new CompactionTransformSpec(new SelectorDimFilter("dim", "a", null), null)).build(); Assert.assertEquals(expectedCompactionState, segments.get(0).getLastCompactionState()); } @@ -712,7 +663,7 @@ public void testCompactionWithNewMetricInMetricsSpec() throws Exception AggregatorFactory expectedCountMetric = new CountAggregatorFactory("cnt"); AggregatorFactory expectedLongSumMetric = new LongSumAggregatorFactory("val", "val"); CompactionState expectedCompactionState = - getDefaultCompactionState(segmentGranularity, Granularities.MINUTE, List.of(TEST_ACTUAL_INTERVAL)) + getDefaultCompactionState(segmentGranularity, Granularities.MINUTE, List.of(inputInterval)) .toBuilder() .metricsSpec(List.of(expectedCountMetric, expectedLongSumMetric)) .build(); @@ -734,13 +685,7 @@ public void testWithGranularitySpecNonNullQueryGranularity() throws Exception verifyTaskSuccessRowsAndSchemaMatch(resultPair, TOTAL_TEST_ROWS); List segments = new ArrayList<>(resultPair.rhs.getSegments()); - verifyCompactedSegment( - compactionTask1.getCompactionRunner(), - segments, - segmentGranularity, - Granularities.SECOND, - false - ); + verifyCompactedSegment(segments, segmentGranularity, Granularities.SECOND, false); } @Test @@ -764,12 +709,8 @@ public void testWithGranularitySpecNonNullQueryGranularityAndCoarseSegmentGranul List segments = List.copyOf(resultPair.rhs.getSegments()); Assert.assertEquals(1, segments.size()); Assert.assertEquals(TEST_INTERVAL_DAY, segments.get(0).getInterval()); - // native runner use interval from actual seen segments, while msq runner use interval from input - Interval interval = compactionTask1.getCompactionRunner() instanceof NativeCompactionRunner - ? TEST_ACTUAL_INTERVAL - : TEST_INTERVAL_DAY; Assert.assertEquals( - getDefaultCompactionState(Granularities.DAY, Granularities.DAY, List.of(interval)), + getDefaultCompactionState(Granularities.DAY, Granularities.DAY, List.of(TEST_INTERVAL_DAY)), segments.get(0).getLastCompactionState() ); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); @@ -1240,14 +1181,10 @@ public void testRunWithSpatialDimensions() throws Exception Assert.assertEquals(1, segments.size()); Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval()); - // native runner use interval from actual seen segments, while msq runner use interval from input - Interval compactInterval = compactionTask.getCompactionRunner() instanceof NativeCompactionRunner - ? Intervals.of("2014-01-01T00:00:00Z/2014-01-01T02:00:00Z") - : inputInterval; CompactionState defaultCompactionState = getDefaultCompactionState( Granularities.SIX_HOUR, Granularities.MINUTE, - List.of(compactInterval) + List.of(TEST_INTERVAL) ); CompactionState newCompactionState = defaultCompactionState.toBuilder().dimensionsSpec( @@ -1354,15 +1291,11 @@ public void testRunWithAutoCastDimensions() throws Exception final List dimensionExclusions = compactionTask.getCompactionRunner() instanceof NativeCompactionRunner ? List.of() : List.of("__time", "val"); - // native runner use interval from actual seen segments, while msq runner use interval from input - final Interval compactInterval = compactionTask.getCompactionRunner() instanceof NativeCompactionRunner - ? Intervals.of("2014-01-01T00:00:00Z/2014-01-01T02:00:00Z") - : TEST_INTERVAL; CompactionState expectedState = getDefaultCompactionState( Granularities.SIX_HOUR, Granularities.MINUTE, - List.of(compactInterval) + List.of(TEST_INTERVAL) ).toBuilder() .dimensionsSpec( new DimensionsSpec(Arrays.asList(/* check explicitly specified types are preserved */ @@ -1795,7 +1728,6 @@ public void verifyTaskSuccessRowsAndSchemaMatch(Pair segments, Granularity gran, Granularity queryGran, @@ -1808,10 +1740,9 @@ protected void verifyCompactedSegment( for (int i = 0; i < 3; i++) { Interval interval = Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1); Assert.assertEquals(interval, segments.get(i).getInterval()); - // native runner use interval from actual seen segments, while msq runner use never use this gran - Interval compactInterval = gran == null ? interval : TEST_ACTUAL_INTERVAL; + Interval inputInterval = gran == null ? interval : this.inputInterval; Assert.assertEquals( - getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, queryGran, List.of(compactInterval)), + getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, queryGran, List.of(inputInterval)), segments.get(i).getLastCompactionState() ); if (useOverwriteShard) { @@ -1826,12 +1757,8 @@ protected void verifyCompactedSegment( } else if (gran.equals(Granularities.SIX_HOUR)) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval()); - // native runner use interval from actual seen segments, while msq runner use interval from input - Interval compactInterval = compactionRunner instanceof NativeCompactionRunner - ? TEST_ACTUAL_INTERVAL - : inputInterval; Assert.assertEquals( - getDefaultCompactionState(gran, queryGran, List.of(compactInterval)), + getDefaultCompactionState(gran, queryGran, List.of(inputInterval)), segments.get(0).getLastCompactionState() ); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java index 3e306e4a9a46..d3430d90b232 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java @@ -35,7 +35,7 @@ @RunWith(Parameterized.class) public class NativeCompactionTaskRunTest extends CompactionTaskRunBase { - @Parameterized.Parameters(name = "name={0}, inputInterval={6}, segmentGran={7}") + @Parameterized.Parameters(name = "name={0}, inputInterval={5}, segmentGran={6}") public static Iterable constructorFeeder() { final List constructors = new ArrayList<>(); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java index 758f27a741f4..512354177990 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java @@ -126,7 +126,7 @@ public class MSQCompactionTaskRunTest extends CompactionTaskRunBase private final ConcurrentHashMap taskActionClients = new ConcurrentHashMap<>(); private Injector injector; - @Parameterized.Parameters(name = "name: {0}, inputInterval={6}, segmentGran={7}") + @Parameterized.Parameters(name = "name: {0}, inputInterval={5}, segmentGran={6}") public static Iterable constructorFeeder() { final List constructors = new ArrayList<>(); From 58e5c6990b220af62366e67cb4e930b7aaf6af14 Mon Sep 17 00:00:00 2001 From: cecemei Date: Tue, 10 Mar 2026 10:58:51 -0700 Subject: [PATCH 2/8] schema --- .../org/apache/druid/indexing/common/task/CompactionTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 5c431648169b..d7a224b05858 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -636,7 +636,7 @@ static Map createDataSchemasForIntervals( toolbox.getEmitter(), metricBuilder, segmentProvider.dataSource, - umbrellaInterval(timelineSegments, segmentProvider), + JodaUtils.umbrellaInterval(Iterables.transform(timelineSegments, DataSegment::getInterval)), lazyFetchSegments( timelineSegments, toolbox.getSegmentCacheManager() From 3565c2a8e75ddd287ca4e5352e20692d60230646 Mon Sep 17 00:00:00 2001 From: cecemei Date: Wed, 11 Mar 2026 10:33:21 -0700 Subject: [PATCH 3/8] revert --- .../java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java index 5e6fc4153770..0d70fccc03b5 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java @@ -593,7 +593,6 @@ public void testMinorCompaction() throws Exception final Pair resultPair1 = runTask(compactionTask1); verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS); verifyCompactedSegment( - compactionTask1.getCompactionRunner(), List.copyOf(resultPair1.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, From 6ae11b195b360f020bbb3ce17f92676ddaf5412b Mon Sep 17 00:00:00 2001 From: cecemei Date: Wed, 11 Mar 2026 11:20:36 -0700 Subject: [PATCH 4/8] flaky-gha --- .github/workflows/worker.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/.github/workflows/worker.yml b/.github/workflows/worker.yml index 600287dded77..ccda2a8fe646 100644 --- a/.github/workflows/worker.yml +++ b/.github/workflows/worker.yml @@ -89,6 +89,27 @@ jobs: - name: 'Execute: ${{ inputs.script }}' run: ${{ inputs.script }} timeout-minutes: 60 + - name: Summarize flaky tests + if: always() + run: | + # Find all flaky failures + flaky_count=$(find . -name "TEST-*.xml" -exec grep -c "> $GITHUB_STEP_SUMMARY + + # If there are any, list them as bullets + if [ "$flaky_count" -gt 0 ]; then + echo "" >> $GITHUB_STEP_SUMMARY + echo "#### Flaky Test Details" >> $GITHUB_STEP_SUMMARY + find . -name "TEST-*.xml" -exec grep -H "> $GITHUB_STEP_SUMMARY + fi + + - name: Normalize flaky failures + if: always() + run: | + find . -name "TEST-*.xml" -exec sed -i 's/flakyFailure/failure/g' {} + - name: Publish Test Report if: ${{ hashFiles('**/TEST-*.xml') != '' }} From dd938e3a7661e37de9f8dd27f5133f82238ef6b3 Mon Sep 17 00:00:00 2001 From: cecemei Date: Wed, 11 Mar 2026 12:22:33 -0700 Subject: [PATCH 5/8] flaky --- .github/workflows/worker.yml | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/.github/workflows/worker.yml b/.github/workflows/worker.yml index ccda2a8fe646..623a2af9cc14 100644 --- a/.github/workflows/worker.yml +++ b/.github/workflows/worker.yml @@ -89,21 +89,19 @@ jobs: - name: 'Execute: ${{ inputs.script }}' run: ${{ inputs.script }} timeout-minutes: 60 + - name: Summarize flaky tests if: always() + shell: bash run: | - # Find all flaky failures - flaky_count=$(find . -name "TEST-*.xml" -exec grep -c "> $GITHUB_STEP_SUMMARY + echo "### Flaky Tests: $flaky_count found" - # If there are any, list them as bullets if [ "$flaky_count" -gt 0 ]; then - echo "" >> $GITHUB_STEP_SUMMARY - echo "#### Flaky Test Details" >> $GITHUB_STEP_SUMMARY - find . -name "TEST-*.xml" -exec grep -H "> $GITHUB_STEP_SUMMARY + echo "#### Flaky Test Details" + grep -R " Date: Wed, 11 Mar 2026 15:45:36 -0700 Subject: [PATCH 6/8] revert2 --- .../task/CompactionTaskParallelRunTest.java | 2 +- .../common/task/CompactionTaskRunBase.java | 117 ++++++++++++++---- .../task/NativeCompactionTaskRunTest.java | 2 +- .../msq/exec/MSQCompactionTaskRunTest.java | 3 +- 4 files changed, 99 insertions(+), 25 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 75d046593883..6b12ad84461c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -372,7 +372,7 @@ public void testRunParallelWithRangePartitioningAndNoUpfrontSegmentFetching() th Granularities.HOUR, Granularities.MINUTE, true, - ImmutableList.of(INTERVAL_TO_INDEX) + ImmutableList.of(Intervals.of("2014-01-01T00:00:00Z/2014-01-01T03:00:00Z")) ), null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java index 8679824df2ba..3d7e1531f306 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunBase.java @@ -170,6 +170,7 @@ public abstract class CompactionTaskRunBase protected static final Interval TEST_INTERVAL_DAY = Intervals.of("2014-01-01/2014-01-02"); protected static final Interval TEST_INTERVAL = Intervals.of("2014-01-01T00:00:00Z/2014-01-01T06:00:00Z"); + protected static final Interval TEST_ACTUAL_INTERVAL = Intervals.of("2014-01-01T00:00:00Z/2014-01-01T03:00:00Z"); protected static final List TEST_ROWS = ImmutableList.of( "2014-01-01T00:00:10Z,a,1\n", "2014-01-01T00:00:10Z,b,2\n", @@ -304,7 +305,13 @@ public void testRunWithDynamicPartitioning() throws Exception final List segments = new ArrayList<>(dataSegmentsWithSchemas.getSegments()); List rowsFromSegment = getCSVFormatRowsFromSegments(segments); Assert.assertEquals(TEST_ROWS, rowsFromSegment); - verifyCompactedSegment(segments, segmentGranularity, DEFAULT_QUERY_GRAN, false); + verifyCompactedSegment( + compactionTask.getCompactionRunner(), + segments, + segmentGranularity, + DEFAULT_QUERY_GRAN, + false + ); } @Test @@ -364,14 +371,44 @@ public void testRunCompactionTwice() throws Exception final Pair resultPair1 = runTask(compactionTask1); verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS); - verifyCompactedSegment(List.copyOf(resultPair1.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, false); + verifyCompactedSegment( + compactionTask1.getCompactionRunner(), + List.copyOf(resultPair1.rhs.getSegments()), + segmentGranularity, + DEFAULT_QUERY_GRAN, + false + ); final CompactionTask compactionTask2 = compactionTaskBuilder(segmentGranularity).interval(inputInterval, true).build(); final Pair resultPair2 = runTask(compactionTask2); verifyTaskSuccessRowsAndSchemaMatch(resultPair2, TOTAL_TEST_ROWS); - verifyCompactedSegment(List.copyOf(resultPair2.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, false); + if (segmentGranularity == null || segmentGranularity.equals(Granularities.HOUR)) { + verifyCompactedSegment( + compactionTask1.getCompactionRunner(), + List.copyOf(resultPair2.rhs.getSegments()), + segmentGranularity, + DEFAULT_QUERY_GRAN, + false + ); + } else if (segmentGranularity.equals(Granularities.SIX_HOUR)) { + Set compactedSegments = resultPair2.rhs.getSegments(); + Assert.assertEquals(1, compactedSegments.size()); + + DataSegment compactedSegment = Iterables.getOnlyElement(compactedSegments); + Assert.assertEquals(TEST_INTERVAL, compactedSegment.getInterval()); + + // compact interval is always the SIX_HOUR interval, since the previous compaction has generated a new SIX_HOUR segment, this means the compaction state in the second compaction is different from the first one. + Assert.assertEquals( + getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN, List.of(TEST_INTERVAL)), + compactedSegment.getLastCompactionState() + ); + Assert.assertEquals(new NumberedShardSpec(0, 1), compactedSegment.getShardSpec()); + + } else { + throw new RE("Gran[%s] is not supported", segmentGranularity); + } } @Test @@ -385,7 +422,13 @@ public void testRunCompactionTwiceWithSegmentLock() throws Exception final Pair resultPair1 = runTask(compactionTask1); verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS); - verifyCompactedSegment(List.copyOf(resultPair1.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, true); + verifyCompactedSegment( + compactionTask1.getCompactionRunner(), + List.copyOf(resultPair1.rhs.getSegments()), + segmentGranularity, + DEFAULT_QUERY_GRAN, + true + ); final CompactionTask compactionTask2 = compactionTaskBuilder(segmentGranularity).interval(inputInterval, false).build(); @@ -399,9 +442,9 @@ public void testRunCompactionTwiceWithSegmentLock() throws Exception for (int i = 0; i < 3; i++) { Interval interval = Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1); Assert.assertEquals(interval, segments.get(i).getInterval()); - Interval inputInterval = segmentGranularity == null ? interval : this.inputInterval; + Interval compactInterval = segmentGranularity == null ? interval : TEST_ACTUAL_INTERVAL; Assert.assertEquals( - getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, DEFAULT_QUERY_GRAN, List.of(inputInterval)), + getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, DEFAULT_QUERY_GRAN, List.of(compactInterval)), segments.get(i).getLastCompactionState() ); // overwrite shard starts at NON_ROOT_GEN_START_PARTITION_ID + 1, and minor version 2 for the second compaction @@ -416,8 +459,9 @@ public void testRunCompactionTwiceWithSegmentLock() throws Exception } else if (segmentGranularity.equals(Granularities.SIX_HOUR)) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval()); + // compact interval is always the SIX_HOUR interval, since the previous compaction has generated a new SIX_HOUR segment, this means the compaction state in the second compaction is different from the first one. Assert.assertEquals( - getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN, List.of(inputInterval)), + getDefaultCompactionState(segmentGranularity, DEFAULT_QUERY_GRAN, List.of(TEST_INTERVAL)), segments.get(0).getLastCompactionState() ); // use overwrite shard for the second compaction @@ -436,7 +480,11 @@ public void testRunCompactionTwiceWithSegmentLock() throws Exception @Test public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exception { - Assume.assumeTrue("Use 3 hr interval to compact", TEST_INTERVAL.equals(inputInterval)); + Assume.assumeTrue( + "test with defined segment granularity and interval in this test", + Granularities.SIX_HOUR.equals(segmentGranularity) && TEST_INTERVAL.equals(inputInterval) + && lockGranularity != LockGranularity.SEGMENT + ); verifyTaskSuccessRowsAndSchemaMatch(runIndexTask(), TOTAL_TEST_ROWS); final CompactionTask compactionTask = @@ -487,6 +535,7 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc Pair compactionResult = compactionFuture.get(); verifyTaskSuccessRowsAndSchemaMatch(compactionResult, TOTAL_TEST_ROWS); verifyCompactedSegment( + compactionTask.getCompactionRunner(), List.copyOf(compactionResult.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, @@ -539,11 +588,7 @@ public void testWithSegmentGranularityMisalignedIntervalAllowed() throws Excepti Assert.assertEquals(Intervals.of("2013-12-30/2014-01-06"), segments.get(0).getInterval()); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); Assert.assertEquals( - getDefaultCompactionState( - Granularities.WEEK, - DEFAULT_QUERY_GRAN, - List.of(inputInterval) - ), + getDefaultCompactionState(Granularities.WEEK, DEFAULT_QUERY_GRAN, List.of(TEST_ACTUAL_INTERVAL)), segments.get(0).getLastCompactionState() ); } @@ -604,10 +649,14 @@ public void testCompactionWithFilterInTransformSpec() throws Exception Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval()); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); + // native runner use interval from actual seen segments, while msq runner use interval from input + Interval compactInterval = compactionTask.getCompactionRunner() instanceof NativeCompactionRunner + ? TEST_ACTUAL_INTERVAL + : inputInterval; CompactionState expectedCompactionState = getDefaultCompactionState( segmentGranularity, DEFAULT_QUERY_GRAN, - List.of(inputInterval) + List.of(compactInterval) ).toBuilder().transformSpec(new CompactionTransformSpec(new SelectorDimFilter("dim", "a", null), null)).build(); Assert.assertEquals(expectedCompactionState, segments.get(0).getLastCompactionState()); } @@ -663,7 +712,7 @@ public void testCompactionWithNewMetricInMetricsSpec() throws Exception AggregatorFactory expectedCountMetric = new CountAggregatorFactory("cnt"); AggregatorFactory expectedLongSumMetric = new LongSumAggregatorFactory("val", "val"); CompactionState expectedCompactionState = - getDefaultCompactionState(segmentGranularity, Granularities.MINUTE, List.of(inputInterval)) + getDefaultCompactionState(segmentGranularity, Granularities.MINUTE, List.of(TEST_ACTUAL_INTERVAL)) .toBuilder() .metricsSpec(List.of(expectedCountMetric, expectedLongSumMetric)) .build(); @@ -685,7 +734,13 @@ public void testWithGranularitySpecNonNullQueryGranularity() throws Exception verifyTaskSuccessRowsAndSchemaMatch(resultPair, TOTAL_TEST_ROWS); List segments = new ArrayList<>(resultPair.rhs.getSegments()); - verifyCompactedSegment(segments, segmentGranularity, Granularities.SECOND, false); + verifyCompactedSegment( + compactionTask1.getCompactionRunner(), + segments, + segmentGranularity, + Granularities.SECOND, + false + ); } @Test @@ -709,8 +764,12 @@ public void testWithGranularitySpecNonNullQueryGranularityAndCoarseSegmentGranul List segments = List.copyOf(resultPair.rhs.getSegments()); Assert.assertEquals(1, segments.size()); Assert.assertEquals(TEST_INTERVAL_DAY, segments.get(0).getInterval()); + // native runner use interval from actual seen segments, while msq runner use interval from input + Interval interval = compactionTask1.getCompactionRunner() instanceof NativeCompactionRunner + ? TEST_ACTUAL_INTERVAL + : TEST_INTERVAL_DAY; Assert.assertEquals( - getDefaultCompactionState(Granularities.DAY, Granularities.DAY, List.of(TEST_INTERVAL_DAY)), + getDefaultCompactionState(Granularities.DAY, Granularities.DAY, List.of(interval)), segments.get(0).getLastCompactionState() ); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); @@ -1181,10 +1240,14 @@ public void testRunWithSpatialDimensions() throws Exception Assert.assertEquals(1, segments.size()); Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval()); + // native runner use interval from actual seen segments, while msq runner use interval from input + Interval compactInterval = compactionTask.getCompactionRunner() instanceof NativeCompactionRunner + ? Intervals.of("2014-01-01T00:00:00Z/2014-01-01T02:00:00Z") + : inputInterval; CompactionState defaultCompactionState = getDefaultCompactionState( Granularities.SIX_HOUR, Granularities.MINUTE, - List.of(TEST_INTERVAL) + List.of(compactInterval) ); CompactionState newCompactionState = defaultCompactionState.toBuilder().dimensionsSpec( @@ -1291,11 +1354,15 @@ public void testRunWithAutoCastDimensions() throws Exception final List dimensionExclusions = compactionTask.getCompactionRunner() instanceof NativeCompactionRunner ? List.of() : List.of("__time", "val"); + // native runner use interval from actual seen segments, while msq runner use interval from input + final Interval compactInterval = compactionTask.getCompactionRunner() instanceof NativeCompactionRunner + ? Intervals.of("2014-01-01T00:00:00Z/2014-01-01T02:00:00Z") + : TEST_INTERVAL; CompactionState expectedState = getDefaultCompactionState( Granularities.SIX_HOUR, Granularities.MINUTE, - List.of(TEST_INTERVAL) + List.of(compactInterval) ).toBuilder() .dimensionsSpec( new DimensionsSpec(Arrays.asList(/* check explicitly specified types are preserved */ @@ -1739,6 +1806,7 @@ public void verifyTaskSuccessRowsAndSchemaMatch(Pair segments, Granularity gran, Granularity queryGran, @@ -1751,9 +1819,10 @@ protected void verifyCompactedSegment( for (int i = 0; i < 3; i++) { Interval interval = Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1); Assert.assertEquals(interval, segments.get(i).getInterval()); - Interval inputInterval = gran == null ? interval : this.inputInterval; + // native runner use interval from actual seen segments, while msq runner use never use this gran + Interval compactInterval = gran == null ? interval : TEST_ACTUAL_INTERVAL; Assert.assertEquals( - getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, queryGran, List.of(inputInterval)), + getDefaultCompactionState(DEFAULT_SEGMENT_GRAN, queryGran, List.of(compactInterval)), segments.get(i).getLastCompactionState() ); if (useOverwriteShard) { @@ -1768,8 +1837,12 @@ protected void verifyCompactedSegment( } else if (gran.equals(Granularities.SIX_HOUR)) { Assert.assertEquals(1, segments.size()); Assert.assertEquals(TEST_INTERVAL, segments.get(0).getInterval()); + // native runner use interval from actual seen segments, while msq runner use interval from input + Interval compactInterval = compactionRunner instanceof NativeCompactionRunner + ? TEST_ACTUAL_INTERVAL + : inputInterval; Assert.assertEquals( - getDefaultCompactionState(gran, queryGran, List.of(inputInterval)), + getDefaultCompactionState(gran, queryGran, List.of(compactInterval)), segments.get(0).getLastCompactionState() ); Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java index d3430d90b232..3e306e4a9a46 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NativeCompactionTaskRunTest.java @@ -35,7 +35,7 @@ @RunWith(Parameterized.class) public class NativeCompactionTaskRunTest extends CompactionTaskRunBase { - @Parameterized.Parameters(name = "name={0}, inputInterval={5}, segmentGran={6}") + @Parameterized.Parameters(name = "name={0}, inputInterval={6}, segmentGran={7}") public static Iterable constructorFeeder() { final List constructors = new ArrayList<>(); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java index 0d70fccc03b5..3f182cef2531 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java @@ -135,7 +135,7 @@ public class MSQCompactionTaskRunTest extends CompactionTaskRunBase private final ConcurrentHashMap taskActionClients = new ConcurrentHashMap<>(); private Injector injector; - @Parameterized.Parameters(name = "name: {0}, inputInterval={5}, segmentGran={6}") + @Parameterized.Parameters(name = "name: {0}, inputInterval={6}, segmentGran={7}") public static Iterable constructorFeeder() { final List constructors = new ArrayList<>(); @@ -593,6 +593,7 @@ public void testMinorCompaction() throws Exception final Pair resultPair1 = runTask(compactionTask1); verifyTaskSuccessRowsAndSchemaMatch(resultPair1, TOTAL_TEST_ROWS); verifyCompactedSegment( + compactionTask1.getCompactionRunner(), List.copyOf(resultPair1.rhs.getSegments()), segmentGranularity, DEFAULT_QUERY_GRAN, From 1faf45a7dd2e67bba1c8465186df5cd2acc75e05 Mon Sep 17 00:00:00 2001 From: cecemei Date: Wed, 11 Mar 2026 18:26:50 -0700 Subject: [PATCH 7/8] trigger ci / empty commit From 6a106e062786143daab505c212665481d5a766c2 Mon Sep 17 00:00:00 2001 From: cecemei Date: Wed, 11 Mar 2026 20:19:44 -0700 Subject: [PATCH 8/8] test --- .github/workflows/worker.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/worker.yml b/.github/workflows/worker.yml index 623a2af9cc14..ace15444e13c 100644 --- a/.github/workflows/worker.yml +++ b/.github/workflows/worker.yml @@ -91,7 +91,7 @@ jobs: timeout-minutes: 60 - name: Summarize flaky tests - if: always() + if: ${{ hashFiles('**/TEST-*.xml') != '' }} shell: bash run: | # Count flaky failures recursively @@ -105,7 +105,7 @@ jobs: fi - name: Normalize flaky failures - if: always() + if: ${{ hashFiles('**/TEST-*.xml') != '' }} run: | find . -name "TEST-*.xml" -exec sed -i 's/flakyFailure/failure/g' {} +