diff --git a/rewrite-core/build.gradle.kts b/rewrite-core/build.gradle.kts index 7955a85dc87..2e3f3db4253 100644 --- a/rewrite-core/build.gradle.kts +++ b/rewrite-core/build.gradle.kts @@ -22,6 +22,9 @@ dependencies { api("org.jspecify:jspecify:latest.release") + // @MustBeClosed on DataTableStore.getRows; CLASS-retention, not needed at runtime. + compileOnly("com.google.errorprone:error_prone_annotations:latest.release") + // Recipe marketplace implementation("com.univocity:univocity-parsers:latest.release") diff --git a/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java b/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java index cd91320919c..982829f1fa6 100644 --- a/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java +++ b/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; +import com.google.errorprone.annotations.MustBeClosed; import com.univocity.parsers.csv.CsvParser; import com.univocity.parsers.csv.CsvParserSettings; import com.univocity.parsers.csv.CsvWriter; @@ -37,8 +38,10 @@ import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.openrewrite.internal.RecipeIntrospectionUtils.dataTableDescriptorFromDataTable; @@ -219,12 +222,14 @@ public void insertRow(DataTable dataTable, ExecutionContext ctx, Row } @Deprecated + @MustBeClosed @Override public Stream getRows(String dataTableName, @Nullable String group) { RowMetadata meta = rowMetadata.get(metaKey(dataTableName, group)); return readRows(dataTableName, group, meta); } + @MustBeClosed @SuppressWarnings("unchecked") @Override public Stream getRows(Class> dataTableClass, @Nullable String group) { @@ -249,7 +254,6 @@ private Stream readRows(String dataTableName, @Nullable String group, @Nu } } - List allRows = new ArrayList<>(); //noinspection DataFlowIssue File[] files = outputDir.toFile().listFiles((dir, name) -> name.endsWith(fileExtension)); if (files == null) { @@ -263,54 +267,104 @@ private Stream readRows(String dataTableName, @Nullable String group, @Nu activeWriterPaths.add(outputDir.resolve(entry.getKey() + fileExtension)); } - int prefixCount = prefixColumns.size(); - int suffixCount = suffixColumns.size(); - + // Select the files belonging to this table by reading only their small comment + // header, then parse their rows lazily so a whole table is never held in memory. + List matchingFiles = new ArrayList<>(); for (File file : files) { if (activeWriterPaths.contains(file.toPath())) { continue; } try (InputStream is = inputStreamFactory.apply(file.toPath())) { DataTableDescriptor descriptor = readDescriptor(is); - if (descriptor == null || - !descriptor.getName().equals(dataTableName) || - !Objects.equals(descriptor.getGroup(), group)) { - continue; + if (descriptor != null && + descriptor.getName().equals(dataTableName) && + Objects.equals(descriptor.getGroup(), group)) { + matchingFiles.add(file.toPath()); } - // readDescriptor consumed comment lines; now parse the remaining CSV - // (header + data rows). Re-read the full file with CsvParser. } catch (IOException e) { throw new UncheckedIOException(e); } + } - try (InputStream is = inputStreamFactory.apply(file.toPath())) { - CsvParserSettings settings = new CsvParserSettings(); - settings.setMaxCharsPerColumn(-1); - settings.setHeaderExtractionEnabled(true); - settings.getFormat().setComment('#'); - CsvParser parser = new CsvParser(settings); - parser.beginParsing(new InputStreamReader(is, StandardCharsets.UTF_8)); - - String[] row; - while ((row = parser.parseNext()) != null) { - // Strip prefix and suffix columns, returning only data table columns - int dataCount = row.length - prefixCount - suffixCount; - String[] dataRow; - if (dataCount <= 0) { - dataRow = row; - } else { - dataRow = new String[dataCount]; - System.arraycopy(row, prefixCount, dataRow, 0, dataCount); + RowSpliterator rows = new RowSpliterator(matchingFiles, meta); + return (Stream) StreamSupport.stream(rows, false).onClose(rows::close); + } + + /** + * Streams rows from the matching files one at a time, keeping a single file open as it + * goes. The file is closed as soon as its last row is read, so a fully drained stream + * (the way every caller consumes one) releases its handle without any explicit close. + * Closing the stream early (try-with-resources) also releases the open file. + */ + private final class RowSpliterator extends Spliterators.AbstractSpliterator { + private final Iterator paths; + private final @Nullable RowMetadata meta; + private final int prefixCount = prefixColumns.size(); + private final int suffixCount = suffixColumns.size(); + + private @Nullable InputStream is; + private @Nullable CsvParser parser; + + RowSpliterator(List matchingFiles, @Nullable RowMetadata meta) { + super(Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.NONNULL); + this.paths = matchingFiles.iterator(); + this.meta = meta; + } + + @Override + public boolean tryAdvance(Consumer action) { + while (true) { + if (parser == null) { + if (!paths.hasNext()) { + return false; } - allRows.add(meta != null ? meta.toRow(dataRow) : dataRow); + open(paths.next()); } - parser.stopParsing(); - } catch (IOException e) { - // Skip unreadable files + String[] row = parser.parseNext(); + if (row == null) { + close(); + continue; + } + // Strip prefix and suffix columns, returning only data table columns + int dataCount = row.length - prefixCount - suffixCount; + String[] dataRow; + if (dataCount <= 0) { + dataRow = row; + } else { + dataRow = new String[dataCount]; + System.arraycopy(row, prefixCount, dataRow, 0, dataCount); + } + action.accept(meta != null ? meta.toRow(dataRow) : dataRow); + return true; } } - return (Stream) allRows.stream(); + // Assign the stream before creating the parser so a setup failure still leaves + // the open stream visible to close(). + private void open(Path path) { + is = inputStreamFactory.apply(path); + CsvParserSettings settings = new CsvParserSettings(); + settings.setMaxCharsPerColumn(-1); + settings.setHeaderExtractionEnabled(true); + settings.getFormat().setComment('#'); + parser = new CsvParser(settings); + parser.beginParsing(new InputStreamReader(is, StandardCharsets.UTF_8)); + } + + void close() { + if (parser != null) { + parser.stopParsing(); + parser = null; + } + if (is != null) { + try { + is.close(); + } catch (IOException ignored) { + // best effort; the parser may have already closed it at end-of-input + } + is = null; + } + } } @Override diff --git a/rewrite-core/src/main/java/org/openrewrite/DataTableStore.java b/rewrite-core/src/main/java/org/openrewrite/DataTableStore.java index c936ef19b29..b2be9c7e77d 100644 --- a/rewrite-core/src/main/java/org/openrewrite/DataTableStore.java +++ b/rewrite-core/src/main/java/org/openrewrite/DataTableStore.java @@ -15,6 +15,7 @@ */ package org.openrewrite; +import com.google.errorprone.annotations.MustBeClosed; import org.jspecify.annotations.Nullable; import java.util.Collection; @@ -63,6 +64,7 @@ static DataTableStore noop() { * @deprecated Use {@link #getRows(Class)} or {@link #getRows(Class, String)} for type-safe deserialization. */ @Deprecated + @MustBeClosed Stream getRows(String dataTableName, @Nullable String group); /** @@ -74,6 +76,7 @@ static DataTableStore noop() { * @param the row type * @return a stream of typed rows, or an empty stream if no rows exist */ + @MustBeClosed @SuppressWarnings("unchecked") default Stream getRows(Class> dataTableClass, @Nullable String group) { return (Stream) getRows(dataTableClass.getName(), group); @@ -86,6 +89,7 @@ default Stream getRows(Class> dataTableClass * @param the row type * @return a stream of typed rows, or an empty stream if no rows exist */ + @MustBeClosed default Stream getRows(Class> dataTableClass) { return getRows(dataTableClass, null); } diff --git a/rewrite-core/src/main/java/org/openrewrite/InMemoryDataTableStore.java b/rewrite-core/src/main/java/org/openrewrite/InMemoryDataTableStore.java index 5ce6612fa47..716a03b0613 100644 --- a/rewrite-core/src/main/java/org/openrewrite/InMemoryDataTableStore.java +++ b/rewrite-core/src/main/java/org/openrewrite/InMemoryDataTableStore.java @@ -15,6 +15,7 @@ */ package org.openrewrite; +import com.google.errorprone.annotations.MustBeClosed; import org.jspecify.annotations.Nullable; import java.util.*; @@ -58,6 +59,7 @@ public void insertRow(DataTable dataTable, ExecutionContext ctx, Row bucket.rows.add(row); } + @MustBeClosed @Override public Stream getRows(String dataTableName, @Nullable String group) { // Scan for matching bucket diff --git a/rewrite-core/src/main/java/org/openrewrite/NoOpDataTableStore.java b/rewrite-core/src/main/java/org/openrewrite/NoOpDataTableStore.java index ab29b2076f3..a6cbd1c82ee 100644 --- a/rewrite-core/src/main/java/org/openrewrite/NoOpDataTableStore.java +++ b/rewrite-core/src/main/java/org/openrewrite/NoOpDataTableStore.java @@ -15,6 +15,7 @@ */ package org.openrewrite; +import com.google.errorprone.annotations.MustBeClosed; import org.jspecify.annotations.Nullable; import java.util.Collection; @@ -34,6 +35,7 @@ private NoOpDataTableStore() { public void insertRow(DataTable dataTable, ExecutionContext ctx, Row row) { } + @MustBeClosed @Override public Stream getRows(String dataTableName, @Nullable String group) { return Stream.empty(); diff --git a/rewrite-core/src/main/java/org/openrewrite/RecipeRun.java b/rewrite-core/src/main/java/org/openrewrite/RecipeRun.java index 8c55a3276ba..9087957bd11 100644 --- a/rewrite-core/src/main/java/org/openrewrite/RecipeRun.java +++ b/rewrite-core/src/main/java/org/openrewrite/RecipeRun.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.Stream; @Value public class RecipeRun { @@ -58,8 +59,9 @@ public List getDataTableRows(String name) { @SuppressWarnings("unchecked") @Deprecated public List getDataTableRows(String name, @Nullable String group) { - return (List) dataTableStore.getRows(name, group) - .collect(Collectors.toList()); + try (Stream rows = dataTableStore.getRows(name, group)) { + return (List) rows.collect(Collectors.toList()); + } } public List getDataTableRows(Class> dataTableClass) { @@ -67,7 +69,8 @@ public List getDataTableRows(Class> dataTableClass } public List getDataTableRows(Class> dataTableClass, @Nullable String group) { - return dataTableStore.getRows(dataTableClass, group) - .collect(Collectors.toList()); + try (Stream rows = dataTableStore.getRows(dataTableClass, group)) { + return rows.collect(Collectors.toList()); + } } } diff --git a/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java b/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java index 838317c697e..d27e5b6936c 100644 --- a/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java +++ b/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java @@ -20,14 +20,21 @@ import org.junit.jupiter.api.io.TempDir; import java.io.FileInputStream; +import java.io.FilterInputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -68,14 +75,19 @@ void insertAndRetrieveRows() { store.insertRow(table, ctx(), new TestTable.Row("alice")); store.insertRow(table, ctx(), new TestTable.Row("bob")); - List rows = store.getRows(table.getName(), null).collect(Collectors.toList()); + List rows; + try (Stream stream = store.getRows(table.getName(), null)) { + rows = stream.collect(Collectors.toList()); + } assertThat(rows).hasSize(2); } @Test void getRowsReturnsEmptyForMissingTable() { InMemoryDataTableStore store = new InMemoryDataTableStore(); - assertThat(store.getRows("nonexistent", null).count()).isZero(); + try (Stream stream = store.getRows("nonexistent", null)) { + assertThat(stream.count()).isZero(); + } } @Test @@ -100,7 +112,10 @@ void groupedTablesShareBucket() { store.insertRow(table2, ctx, new TestTable.Row("from-recipe-2")); // Both contributed to the same group bucket - List rows = store.getRows(TestTable.class.getName(), "shared").collect(Collectors.toList()); + List rows; + try (Stream stream = store.getRows(TestTable.class.getName(), "shared")) { + rows = stream.collect(Collectors.toList()); + } assertThat(rows).hasSize(2); // Only one bucket entry (first one wins as representative) @@ -161,7 +176,9 @@ void noopStoreDropsInserts() { TestTable table = new TestTable(Recipe.noop()); store.insertRow(table, ctx(), new TestTable.Row("dropped")); - assertThat(store.getRows(table.getName(), null).count()).isZero(); + try (Stream stream = store.getRows(table.getName(), null)) { + assertThat(stream.count()).isZero(); + } assertThat(store.getDataTables()).isEmpty(); } @@ -285,7 +302,10 @@ void csvStoreGetRowsReadsBackWrittenData(@TempDir Path tempDir) { store.insertRow(table, ctx(), new TestTable.Row("alice")); store.insertRow(table, ctx(), new TestTable.Row("bob")); - List rows = store.getRows(table.getName(), null).collect(Collectors.toList()); + List rows; + try (Stream stream = store.getRows(table.getName(), null)) { + rows = stream.collect(Collectors.toList()); + } assertThat(rows).hasSize(2); assertThat(rows.get(0)).isEqualTo(new TestTable.Row("alice")); assertThat(rows.get(1)).isEqualTo(new TestTable.Row("bob")); @@ -299,7 +319,10 @@ void csvStoreGetRowsMultipleColumns(@TempDir Path tempDir) { store.insertRow(table, ctx(), new MultiColTable.Row(1, "hello")); store.insertRow(table, ctx(), new MultiColTable.Row(2, "world")); - List rows = store.getRows(table.getName(), null).collect(Collectors.toList()); + List rows; + try (Stream stream = store.getRows(table.getName(), null)) { + rows = stream.collect(Collectors.toList()); + } assertThat(rows).hasSize(2); assertThat(rows.get(0)).isEqualTo(new MultiColTable.Row(1, "hello")); assertThat(rows.get(1)).isEqualTo(new MultiColTable.Row(2, "world")); @@ -312,7 +335,10 @@ void csvStoreGetRowsReturnsEmptyForMissingTable(@TempDir Path tempDir) { TestTable table = new TestTable(Recipe.noop()); store.insertRow(table, ctx(), new TestTable.Row("alice")); - List rows = store.getRows("nonexistent.Table", null).collect(Collectors.toList()); + List rows; + try (Stream stream = store.getRows("nonexistent.Table", null)) { + rows = stream.collect(Collectors.toList()); + } assertThat(rows).isEmpty(); } } @@ -325,11 +351,17 @@ void csvStoreGetRowsMatchesByGroup(@TempDir Path tempDir) { store.insertRow(grouped, ctx(), new TestTable.Row("grouped-row")); store.insertRow(ungrouped, ctx(), new TestTable.Row("ungrouped-row")); - List groupedRows = store.getRows(grouped.getName(), "group-a").collect(Collectors.toList()); + List groupedRows; + try (Stream stream = store.getRows(grouped.getName(), "group-a")) { + groupedRows = stream.collect(Collectors.toList()); + } assertThat(groupedRows).hasSize(1); assertThat(groupedRows.getFirst()).isEqualTo(new TestTable.Row("grouped-row")); - List ungroupedRows = store.getRows(ungrouped.getName(), null).collect(Collectors.toList()); + List ungroupedRows; + try (Stream stream = store.getRows(ungrouped.getName(), null)) { + ungroupedRows = stream.collect(Collectors.toList()); + } assertThat(ungroupedRows).hasSize(1); assertThat(ungroupedRows.getFirst()).isEqualTo(new TestTable.Row("ungrouped-row")); } @@ -359,7 +391,10 @@ void csvStoreGetRowsStripsPrefixAndSuffixColumns(@TempDir Path tempDir) { TestTable table = new TestTable(Recipe.noop()); store.insertRow(table, ctx(), new TestTable.Row("alice")); - List rows = store.getRows(table.getName(), null).collect(Collectors.toList()); + List rows; + try (Stream stream = store.getRows(table.getName(), null)) { + rows = stream.collect(Collectors.toList()); + } assertThat(rows).hasSize(1); // Should only contain the data column, not prefix/suffix assertThat(rows.getFirst()).isEqualTo(new TestTable.Row("alice")); @@ -376,7 +411,10 @@ void csvStoreGetRowsAfterClose(@TempDir Path tempDir) { // Read back from a new store instance pointing at the same directory try (CsvDataTableStore store2 = new CsvDataTableStore(tempDir)) { - List rows = store2.getRows(table.getName(), null).collect(Collectors.toList()); + List rows; + try (Stream stream = store2.getRows(table.getName(), null)) { + rows = stream.collect(Collectors.toList()); + } assertThat(rows).hasSize(2); assertThat((String[]) rows.get(0)).containsExactly("alice"); assertThat((String[]) rows.get(1)).containsExactly("bob"); @@ -391,7 +429,10 @@ void csvStoreGetRowsHandlesSpecialCharacters(@TempDir Path tempDir) { store.insertRow(table, ctx(), new TestTable.Row("value with \"quotes\"")); store.insertRow(table, ctx(), new TestTable.Row("value with\nnewline")); - List rows = store.getRows(table.getName(), null).collect(Collectors.toList()); + List rows; + try (Stream stream = store.getRows(table.getName(), null)) { + rows = stream.collect(Collectors.toList()); + } assertThat(rows).hasSize(3); assertThat(rows.get(0)).isEqualTo(new TestTable.Row("value with, comma")); assertThat(rows.get(1)).isEqualTo(new TestTable.Row("value with \"quotes\"")); @@ -399,6 +440,60 @@ void csvStoreGetRowsHandlesSpecialCharacters(@TempDir Path tempDir) { } } + @Test + void csvStoreGetRowsClosesEveryOpenedFileHandle(@TempDir Path tempDir) { + AtomicInteger opened = new AtomicInteger(); + AtomicInteger closed = new AtomicInteger(); + Function countingInput = path -> { + try { + opened.incrementAndGet(); + return new FilterInputStream(Files.newInputStream(path)) { + private boolean counted; + + @Override + public void close() throws IOException { + if (!counted) { + counted = true; + closed.incrementAndGet(); + } + super.close(); + } + }; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + Function appendOutput = path -> { + try { + return Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + + try (CsvDataTableStore store = new CsvDataTableStore( + tempDir, appendOutput, countingInput, ".csv", Collections.emptyMap(), Collections.emptyMap())) { + TestTable table = new TestTable(Recipe.noop()); + List names = List.of("alice", "bob", "carol", "dave", "erin", "frank"); + for (String name : names) { + store.insertRow(table, ctx(), new TestTable.Row(name)); + } + + List all; + try (Stream rows = store.getRows(TestTable.class)) { + all = rows.collect(Collectors.toList()); + } + assertThat(all).extracting(TestTable.Row::getName).containsExactlyElementsOf(names); + + assertThat(opened.get()) + .as("a lazy row-parsing stream was opened in addition to the header scan") + .isGreaterThanOrEqualTo(2); + assertThat(closed.get()) + .as("reading the data table back closes every input it opened") + .isEqualTo(opened.get()); + } + } + // ========================================================================= // CsvDataTableStore.fileKey // ========================================================================= @@ -532,7 +627,10 @@ void csvStoreIntermixedWritesAndReads(@TempDir Path tempDir) { store.insertRow(table, ctx(), new TestTable.Row("bob")); // Mid-run read (closes the writer internally) - List firstRead = store.getRows(table.getName(), null).collect(Collectors.toList()); + List firstRead; + try (Stream stream = store.getRows(table.getName(), null)) { + firstRead = stream.collect(Collectors.toList()); + } assertThat(firstRead).hasSize(2); assertThat(firstRead.get(0)).isEqualTo(new TestTable.Row("alice")); assertThat(firstRead.get(1)).isEqualTo(new TestTable.Row("bob")); @@ -541,7 +639,10 @@ void csvStoreIntermixedWritesAndReads(@TempDir Path tempDir) { store.insertRow(table, ctx(), new TestTable.Row("charlie")); // Second read should see all three rows - List secondRead = store.getRows(table.getName(), null).collect(Collectors.toList()); + List secondRead; + try (Stream stream = store.getRows(table.getName(), null)) { + secondRead = stream.collect(Collectors.toList()); + } assertThat(secondRead).hasSize(3); assertThat(secondRead.get(0)).isEqualTo(new TestTable.Row("alice")); assertThat(secondRead.get(1)).isEqualTo(new TestTable.Row("bob")); diff --git a/rewrite-core/src/test/java/org/openrewrite/RecipeRunTest.java b/rewrite-core/src/test/java/org/openrewrite/RecipeRunTest.java index fcef17e0da1..a9e503d6ecf 100644 --- a/rewrite-core/src/test/java/org/openrewrite/RecipeRunTest.java +++ b/rewrite-core/src/test/java/org/openrewrite/RecipeRunTest.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import java.util.List; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.openrewrite.test.SourceSpecs.text; @@ -81,8 +82,10 @@ void dataTableWithMultilineValue() { .findFirst().orElseThrow(); // Verify the row has multiline content - List rows = store.getRows(parseFailuresDt.getName(), parseFailuresDt.getGroup()) - .toList(); + List rows; + try (Stream stream = store.getRows(parseFailuresDt.getName(), parseFailuresDt.getGroup())) { + rows = stream.toList(); + } assertThat(rows).hasSize(1); }), text( diff --git a/rewrite-test/src/main/java/org/openrewrite/test/RecipeSpec.java b/rewrite-test/src/main/java/org/openrewrite/test/RecipeSpec.java index f87ba6f4ea2..9f818e3039d 100644 --- a/rewrite-test/src/main/java/org/openrewrite/test/RecipeSpec.java +++ b/rewrite-test/src/main/java/org/openrewrite/test/RecipeSpec.java @@ -37,9 +37,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -228,9 +230,12 @@ public RecipeSpec dataTable(Class rowType, UncheckedConsumer> ext DataTableStore store = run.getDataTableStore(); for (DataTable dt : store.getDataTables()) { if (dt.getType().equals(rowType)) { - @SuppressWarnings("unchecked") - List rows = (List) store.getRows(dt.getName(), dt.getGroup()) - .collect(java.util.stream.Collectors.toList()); + List rows; + try (Stream stream = store.getRows(dt.getName(), dt.getGroup())) { + @SuppressWarnings("unchecked") + List typed = (List) stream.collect(toList()); + rows = typed; + } assertThat(rows).isNotNull(); assertThat(rows).isNotEmpty(); extract.accept(rows); @@ -266,8 +271,10 @@ public RecipeSpec dataTableAsCsv(String name, String expect) { } } assertThat(dataTable).isNotNull(); - List rows = store.getRows(dataTable.getName(), dataTable.getGroup()) - .collect(java.util.stream.Collectors.toList()); + List rows; + try (Stream stream = store.getRows(dataTable.getName(), dataTable.getGroup())) { + rows = stream.collect(toList()); + } StringWriter writer = new StringWriter(); CsvMapper mapper = CsvMapper.builder() .disable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)