From bd4efd4273731de7ed7baa6635b1e9507906422a Mon Sep 17 00:00:00 2001 From: minwenjun Date: Thu, 15 Jan 2026 08:17:26 +0800 Subject: [PATCH 1/2] [core] Fix potential data loss in DataEvolutionIterator when the inner reader returns batches with different row counts. --- .../java/org/apache/paimon/CoreOptions.java | 4 + .../reader/DataEvolutionFileReader.java | 163 +++++++++- .../paimon/reader/DataEvolutionIterator.java | 65 ---- .../reader/DataEvolutionFileReaderTest.java | 287 ++++++++++++++++++ .../reader/DataEvolutionIteratorTest.java | 149 --------- .../apache/paimon/AppendOnlyFileStore.java | 3 +- .../operation/DataEvolutionSplitRead.java | 8 +- 7 files changed, 453 insertions(+), 226 deletions(-) delete mode 100644 paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java create mode 100755 paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionFileReaderTest.java delete mode 100644 paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 6dc1e0a91dd6..6876d2e17409 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -3361,6 +3361,10 @@ public boolean overwriteUpgrade() { return options.get(OVERWRITE_UPGRADE); } + public int readBatchSize() { + return options.get(READ_BATCH_SIZE); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java index 1c50410fa170..0fc8862cf10b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java @@ -18,6 +18,7 @@ package org.apache.paimon.reader; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.utils.IOUtils; @@ -58,8 +59,36 @@ public class DataEvolutionFileReader implements RecordReader { private final int[] fieldOffsets; private final RecordReader[] readers; + /** + * Cached iterators for each inner reader. An entry is non-null if the corresponding reader + * currently has an opened batch which is not fully consumed. + */ + private final RecordIterator[] pending; + + /** Marks whether the corresponding reader has reached end of input. */ + private final boolean[] finished; + + /** Maximum number of rows produced for each outer {@link #readBatch()} call. */ + private final int batchSize; + + /** Number of rows emitted by current outer iterator. */ + private int rowsEmittedInCurrentBatch; + + /** Whether any inner reader has reached end of input. */ + private boolean endOfInput; + + @SuppressWarnings("unchecked") public DataEvolutionFileReader( int[] rowOffsets, int[] fieldOffsets, RecordReader[] readers) { + this(rowOffsets, fieldOffsets, readers, CoreOptions.READ_BATCH_SIZE.defaultValue()); + } + + @SuppressWarnings("unchecked") + public DataEvolutionFileReader( + int[] rowOffsets, + int[] fieldOffsets, + RecordReader[] readers, + int batchSize) { checkArgument(rowOffsets != null, "Row offsets must not be null"); checkArgument(fieldOffsets != null, "Field offsets must not be null"); checkArgument( @@ -67,29 +96,109 @@ public DataEvolutionFileReader( "Row offsets and field offsets must have the same length"); checkArgument(rowOffsets.length > 0, "Row offsets must not be empty"); checkArgument(readers != null && readers.length > 1, "Readers should be more than 1"); + checkArgument(batchSize > 0, "Batch size must be greater than 0"); + this.rowOffsets = rowOffsets; this.fieldOffsets = fieldOffsets; this.readers = readers; + this.batchSize = batchSize; + + this.pending = new RecordIterator[readers.length]; + this.finished = new boolean[readers.length]; + this.rowsEmittedInCurrentBatch = 0; + this.endOfInput = false; } @Override @Nullable public RecordIterator readBatch() throws IOException { + if (endOfInput) { + return null; + } + + rowsEmittedInCurrentBatch = 0; DataEvolutionRow row = new DataEvolutionRow(readers.length, rowOffsets, fieldOffsets); - RecordIterator[] iterators = new RecordIterator[readers.length]; + return new DataEvolutionAlignedIterator(this, row); + } + + @Nullable + InternalRow nextRow(DataEvolutionRow row) throws IOException { + if (endOfInput) { + return null; + } + + // Fetch one row from each non-null reader. for (int i = 0; i < readers.length; i++) { - RecordReader reader = readers[i]; - if (reader != null) { - RecordIterator batch = reader.readBatch(); - if (batch == null) { - // all readers are aligned, as long as one returns null, the others will also - // have no data + if (readers[i] == null) { + // This reader does not contribute any fields. + continue; + } + + InternalRow buffered = fetchNextFromReader(i); + if (buffered == null) { + markEndOfInput(); + return null; + } + + row.setRow(i, buffered); + } + + return row; + } + + private InternalRow fetchNextFromReader(int readerIndex) throws IOException { + while (true) { + if (finished[readerIndex]) { + return null; + } + + RecordIterator iterator = pending[readerIndex]; + if (iterator == null) { + iterator = readers[readerIndex].readBatch(); + if (iterator == null) { + finished[readerIndex] = true; return null; } - iterators[i] = batch; + pending[readerIndex] = iterator; + } + + InternalRow next = iterator.next(); + if (next != null) { + return next; + } + + // current batch is exhausted, release and try next batch + iterator.releaseBatch(); + pending[readerIndex] = null; + } + } + + private void markEndOfInput() { + endOfInput = true; + // Release all pending batches. + for (int i = 0; i < pending.length; i++) { + RecordIterator iterator = pending[i]; + if (iterator != null) { + iterator.releaseBatch(); + pending[i] = null; } } - return new DataEvolutionIterator(row, iterators); + } + + boolean isEndOfInput() { + return endOfInput; + } + + int getBatchSize() { + return batchSize; + } + + int getRowsEmittedInCurrentBatch() { + return rowsEmittedInCurrentBatch; + } + + void incrementRowsEmittedInCurrentBatch() { + rowsEmittedInCurrentBatch++; } @Override @@ -100,4 +209,40 @@ public void close() throws IOException { throw new IOException("Failed to close inner readers", e); } } + + /** + * A {@link org.apache.paimon.reader.RecordReader.RecordIterator} which aligns rows from + * multiple inner readers and assembles them into a {@link DataEvolutionRow}. + */ + class DataEvolutionAlignedIterator implements RecordReader.RecordIterator { + + private final DataEvolutionFileReader fileReader; + private final DataEvolutionRow row; + + DataEvolutionAlignedIterator(DataEvolutionFileReader fileReader, DataEvolutionRow row) { + this.fileReader = fileReader; + this.row = row; + } + + @Nullable + @Override + public InternalRow next() throws IOException { + if (fileReader.isEndOfInput() + || fileReader.getRowsEmittedInCurrentBatch() >= fileReader.getBatchSize()) { + return null; + } + + InternalRow nextRow = fileReader.nextRow(row); + if (nextRow != null) { + fileReader.incrementRowsEmittedInCurrentBatch(); + } + return nextRow; + } + + @Override + public void releaseBatch() { + // Batches of inner readers are released when they are exhausted inside + // {@link DataEvolutionFileReader}. Nothing to do here. + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java deleted file mode 100644 index 17a68454f037..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionIterator.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.reader; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.reader.RecordReader.RecordIterator; - -import javax.annotation.Nullable; - -import java.io.IOException; - -/** - * The batch which is made up by several batches, it assumes that all iterators are aligned, and as - * long as one returns null, the others will also have no data. - */ -public class DataEvolutionIterator implements RecordIterator { - - private final DataEvolutionRow row; - private final RecordIterator[] iterators; - - public DataEvolutionIterator(DataEvolutionRow row, RecordIterator[] iterators) { - this.row = row; - this.iterators = iterators; - } - - @Nullable - @Override - public InternalRow next() throws IOException { - for (int i = 0; i < iterators.length; i++) { - if (iterators[i] != null) { - InternalRow next = iterators[i].next(); - if (next == null) { - return null; - } - row.setRow(i, next); - } - } - return row; - } - - @Override - public void releaseBatch() { - for (RecordIterator iterator : iterators) { - if (iterator != null) { - iterator.releaseBatch(); - } - } - } -} diff --git a/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionFileReaderTest.java b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionFileReaderTest.java new file mode 100755 index 000000000000..a95521969a80 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionFileReaderTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.reader; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DataEvolutionFileReader}. */ +public class DataEvolutionFileReaderTest { + + @Test + public void testNextWithData() throws Exception { + int[] rowOffsets = new int[] {0, 1}; + int[] fieldOffsets = new int[] {0, 0}; + + SimpleRecordReader reader1 = new SimpleRecordReader(new InternalRow[] {GenericRow.of(1)}); + SimpleRecordReader reader2 = new SimpleRecordReader(new InternalRow[] {GenericRow.of(2)}); + + @SuppressWarnings("unchecked") + RecordReader[] readers = new RecordReader[] {reader1, reader2}; + + DataEvolutionFileReader evolutionFileReader = + new DataEvolutionFileReader(rowOffsets, fieldOffsets, readers, 2); + + RecordReader.RecordIterator batch = evolutionFileReader.readBatch(); + InternalRow result = batch.next(); + assertThat(result.getInt(0)).isEqualTo(1); + assertThat(result.getInt(1)).isEqualTo(2); + + InternalRow nullResult = batch.next(); + assertThat(nullResult).isNull(); + + evolutionFileReader.close(); + } + + @Test + public void testNextWhenFirstReaderIsEmpty() throws Exception { + int[] rowOffsets = new int[] {0, 1}; + int[] fieldOffsets = new int[] {0, 0}; + + SimpleRecordReader reader1 = new SimpleRecordReader(new InternalRow[0]); + SimpleRecordReader reader2 = new SimpleRecordReader(new InternalRow[] {GenericRow.of(2)}); + + @SuppressWarnings("unchecked") + RecordReader[] readers = new RecordReader[] {reader1, reader2}; + + DataEvolutionFileReader evolutionFileReader = + new DataEvolutionFileReader(rowOffsets, fieldOffsets, readers, 2); + + RecordReader.RecordIterator batch = evolutionFileReader.readBatch(); + InternalRow result = batch.next(); + + assertThat(result).isNull(); + + evolutionFileReader.close(); + } + + @Test + public void testNextWithNullReaderInArray() throws Exception { + int[] rowOffsets = new int[] {0, 2}; + int[] fieldOffsets = new int[] {0, 0}; + + SimpleRecordReader reader1 = new SimpleRecordReader(new InternalRow[] {GenericRow.of(1)}); + SimpleRecordReader reader2 = new SimpleRecordReader(new InternalRow[] {GenericRow.of(2)}); + + @SuppressWarnings("unchecked") + RecordReader[] readers = new RecordReader[] {reader1, null, reader2}; + + DataEvolutionFileReader evolutionFileReader = + new DataEvolutionFileReader(rowOffsets, fieldOffsets, readers, 2); + + RecordReader.RecordIterator batch = evolutionFileReader.readBatch(); + InternalRow result = batch.next(); + + assertThat(result.getInt(0)).isEqualTo(1); + assertThat(result.getInt(1)).isEqualTo(2); + + InternalRow nullResult = batch.next(); + assertThat(nullResult).isNull(); + + evolutionFileReader.close(); + } + + @Test + public void testReleaseInnerIteratorsAfterEndOfInput() throws Exception { + int[] rowOffsets = new int[] {0, 1, 2}; + int[] fieldOffsets = new int[] {0, 0, 0}; + + SimpleRecordReader reader1 = new SimpleRecordReader(new InternalRow[] {GenericRow.of(1)}); + SimpleRecordReader reader2 = new SimpleRecordReader(new InternalRow[] {GenericRow.of(2)}); + SimpleRecordReader reader3 = new SimpleRecordReader(new InternalRow[] {GenericRow.of(3)}); + + @SuppressWarnings("unchecked") + RecordReader[] readers = new RecordReader[] {reader1, null, reader2, reader3}; + + DataEvolutionFileReader evolutionFileReader = + new DataEvolutionFileReader(rowOffsets, fieldOffsets, readers, 2); + + RecordReader.RecordIterator batch = evolutionFileReader.readBatch(); + + // consume the single aligned row + assertThat(batch.next()).isNotNull(); + // second call should trigger end-of-input and release underlying iterators + assertThat(batch.next()).isNull(); + + assertThat(reader1.iterator.released).isTrue(); + assertThat(reader2.iterator.released).isTrue(); + assertThat(reader3.iterator.released).isTrue(); + + evolutionFileReader.close(); + } + + @Test + public void testAlignedBatchesDoNotDropRows() throws Exception { + int totalRows = 10; + + TestRecordReader reader1 = new TestRecordReader(new int[] {8, 2}, totalRows); + TestRecordReader reader2 = new TestRecordReader(new int[] {5, 5}, totalRows); + + int[] rowOffsets = new int[] {0, 1}; + int[] fieldOffsets = new int[] {0, 0}; + @SuppressWarnings("unchecked") + RecordReader[] readers = new RecordReader[] {reader1, reader2}; + + // Force outer batch size to 5 so that we have two batches of 5 rows each. + DataEvolutionFileReader evolutionFileReader = + new DataEvolutionFileReader(rowOffsets, fieldOffsets, readers, 5); + + int totalRead = 0; + List batchSizes = new ArrayList<>(); + + RecordReader.RecordIterator batch; + while ((batch = evolutionFileReader.readBatch()) != null) { + int currentBatch = 0; + InternalRow row; + while ((row = batch.next()) != null) { + currentBatch++; + } + batch.releaseBatch(); + + if (currentBatch == 0) { + break; + } + + batchSizes.add(currentBatch); + totalRead += currentBatch; + } + + evolutionFileReader.close(); + + assertThat(batchSizes).containsExactly(5, 5); + assertThat(totalRead).isEqualTo(totalRows); + } + + private static class SimpleRecordReader implements RecordReader { + + private final TestIterator iterator; + private boolean batchReturned; + + SimpleRecordReader(InternalRow[] rows) { + this.iterator = new TestIterator(rows); + } + + @Nullable + @Override + public RecordIterator readBatch() { + if (batchReturned) { + return null; + } + batchReturned = true; + return iterator; + } + + @Override + public void close() { + // nothing to close + } + } + + private static class TestRecordReader implements RecordReader { + + private final List rows; + private final int[] batchSizes; + private int nextBatchIndex = 0; + private int readOffset = 0; + + TestRecordReader(int[] batchSizes, int totalRows) { + this.batchSizes = batchSizes; + this.rows = new ArrayList<>(totalRows); + for (int i = 0; i < totalRows; i++) { + this.rows.add(GenericRow.of(i)); + } + } + + @Nullable + @Override + public RecordIterator readBatch() { + if (readOffset >= rows.size() || nextBatchIndex >= batchSizes.length) { + return null; + } + + int size = Math.min(batchSizes[nextBatchIndex++], rows.size() - readOffset); + List batch = rows.subList(readOffset, readOffset + size); + readOffset += size; + return new ListRecordIterator(batch); + } + + @Override + public void close() { + // nothing to close + } + } + + private static class ListRecordIterator implements RecordReader.RecordIterator { + + private final List rows; + private int index = 0; + + ListRecordIterator(List rows) { + this.rows = rows; + } + + @Nullable + @Override + public InternalRow next() { + if (index < rows.size()) { + return rows.get(index++); + } + return null; + } + + @Override + public void releaseBatch() { + // nothing to release + } + } + + private static class TestIterator implements RecordReader.RecordIterator { + + private final InternalRow[] rows; + private int index; + private boolean released; + + TestIterator(InternalRow[] rows) { + this.rows = rows; + } + + @Nullable + @Override + public InternalRow next() { + if (index < rows.length) { + return rows[index++]; + } + return null; + } + + @Override + public void releaseBatch() { + released = true; + } + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java b/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java deleted file mode 100644 index 6dd7d61de631..000000000000 --- a/paimon-common/src/test/java/org/apache/paimon/reader/DataEvolutionIteratorTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.reader; - -import org.apache.paimon.data.InternalRow; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.InOrder; - -import java.io.IOException; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** Tests for {@link DataEvolutionIterator}. */ -public class DataEvolutionIteratorTest { - - private DataEvolutionRow mockRow; - private RecordReader.RecordIterator iterator1; - private RecordReader.RecordIterator iterator2; - private InternalRow row1; - private InternalRow row2; - - @BeforeEach - public void setUp() { - mockRow = mock(DataEvolutionRow.class); - iterator1 = mock(RecordReader.RecordIterator.class); - iterator2 = mock(RecordReader.RecordIterator.class); - row1 = mock(InternalRow.class); - row2 = mock(InternalRow.class); - } - - @Test - public void testNextWithData() throws IOException { - when(iterator1.next()).thenReturn(row1).thenReturn(null); - when(iterator2.next()).thenReturn(row2).thenReturn(null); - - DataEvolutionIterator evolutionIterator = - new DataEvolutionIterator( - mockRow, new RecordReader.RecordIterator[] {iterator1, iterator2}); - - // First call to next() - InternalRow result = evolutionIterator.next(); - assertThat(result).isSameAs(mockRow); - - InOrder inOrder = inOrder(iterator1, iterator2, mockRow); - inOrder.verify(iterator1).next(); - inOrder.verify(mockRow).setRow(0, row1); - inOrder.verify(iterator2).next(); - inOrder.verify(mockRow).setRow(1, row2); - - // Second call to next() should return null - InternalRow nullResult = evolutionIterator.next(); - assertThat(nullResult).isNull(); - verify(iterator1, times(2)).next(); - verify(iterator2, times(1)).next(); // Should not be called again - } - - @Test - public void testNextWhenFirstIteratorIsEmpty() throws IOException { - when(iterator1.next()).thenReturn(null); - - DataEvolutionIterator evolutionIterator = - new DataEvolutionIterator( - mockRow, new RecordReader.RecordIterator[] {iterator1, iterator2}); - - InternalRow result = evolutionIterator.next(); - assertThat(result).isNull(); - - verify(iterator1).next(); - verify(iterator2, never()).next(); - verify(mockRow, never()).setRow(anyInt(), any()); - } - - @Test - public void testNextWithNullIteratorInArray() throws IOException { - when(iterator1.next()).thenReturn(row1).thenReturn(null); - when(iterator2.next()).thenReturn(row2).thenReturn(null); - - DataEvolutionIterator evolutionIterator = - new DataEvolutionIterator( - mockRow, new RecordReader.RecordIterator[] {iterator1, null, iterator2}); - - InternalRow result = evolutionIterator.next(); - assertThat(result).isSameAs(mockRow); - - InOrder inOrder = inOrder(iterator1, iterator2, mockRow); - inOrder.verify(iterator1).next(); - inOrder.verify(mockRow).setRow(0, row1); - inOrder.verify(iterator2).next(); - inOrder.verify(mockRow).setRow(2, row2); - verify(mockRow, never()).setRow(1, null); // Check that index 1 is skipped - - // Next call returns null - InternalRow nullResult = evolutionIterator.next(); - assertThat(nullResult).isNull(); - } - - @Test - public void testNextWithEmptyIterators() throws IOException { - DataEvolutionIterator evolutionIterator = - new DataEvolutionIterator(mockRow, new RecordReader.RecordIterator[0]); - - InternalRow result = evolutionIterator.next(); - assertThat(result).isSameAs(mockRow); - - verify(mockRow, never()).setRow(anyInt(), any()); - } - - @Test - public void testReleaseBatch() { - RecordReader.RecordIterator iterator3 = - mock(RecordReader.RecordIterator.class); - DataEvolutionIterator evolutionIterator = - new DataEvolutionIterator( - mockRow, - new RecordReader.RecordIterator[] {iterator1, null, iterator2, iterator3}); - - evolutionIterator.releaseBatch(); - - verify(iterator1).releaseBatch(); - verify(iterator2).releaseBatch(); - verify(iterator3).releaseBatch(); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 51cfe58b42fe..b6b4b882fb1e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -98,7 +98,8 @@ public DataEvolutionSplitRead newDataEvolutionRead() { schema, rowType, FileFormatDiscover.of(options), - pathFactory()); + pathFactory(), + options.readBatchSize()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index 81f74fe0208b..3a78e8ad0cd5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -90,6 +90,7 @@ public class DataEvolutionSplitRead implements SplitRead { private final Map formatReaderMappings; private final Function schemaFetcher; @Nullable private VariantAccessInfo[] variantAccess; + private final int readBatchSize; protected RowType readRowType; @@ -99,7 +100,8 @@ public DataEvolutionSplitRead( TableSchema schema, RowType rowType, FileFormatDiscover formatDiscover, - FileStorePathFactory pathFactory) { + FileStorePathFactory pathFactory, + int readBatchSize) { this.fileIO = fileIO; final Map cache = new HashMap<>(); this.schemaFetcher = @@ -108,6 +110,7 @@ public DataEvolutionSplitRead( this.formatDiscover = formatDiscover; this.pathFactory = pathFactory; this.formatReaderMappings = new HashMap<>(); + this.readBatchSize = readBatchSize; this.readRowType = rowType; } @@ -327,7 +330,8 @@ private DataEvolutionFileReader createUnionReader( } } - return new DataEvolutionFileReader(rowOffsets, fieldOffsets, fileRecordReaders); + return new DataEvolutionFileReader( + rowOffsets, fieldOffsets, fileRecordReaders, readBatchSize); } private FileRecordReader createFileReader( From 4ca29ea08adc57a40c79b11697f5fcb7b97d0054 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Thu, 15 Jan 2026 23:35:30 +0800 Subject: [PATCH 2/2] remove the unused construct --- .../org/apache/paimon/reader/DataEvolutionFileReader.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java index 0fc8862cf10b..95576cfb308b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionFileReader.java @@ -18,7 +18,6 @@ package org.apache.paimon.reader; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.utils.IOUtils; @@ -77,12 +76,6 @@ public class DataEvolutionFileReader implements RecordReader { /** Whether any inner reader has reached end of input. */ private boolean endOfInput; - @SuppressWarnings("unchecked") - public DataEvolutionFileReader( - int[] rowOffsets, int[] fieldOffsets, RecordReader[] readers) { - this(rowOffsets, fieldOffsets, readers, CoreOptions.READ_BATCH_SIZE.defaultValue()); - } - @SuppressWarnings("unchecked") public DataEvolutionFileReader( int[] rowOffsets,