Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rewrite-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ dependencies {

api("org.jspecify:jspecify:latest.release")

// @MustBeClosed on DataTableStore.getRows; CLASS-retention, not needed at runtime.
compileOnly("com.google.errorprone:error_prone_annotations:latest.release")

// Recipe marketplace
implementation("com.univocity:univocity-parsers:latest.release")

Expand Down
120 changes: 87 additions & 33 deletions rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import com.google.errorprone.annotations.MustBeClosed;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import com.univocity.parsers.csv.CsvWriter;
Expand All @@ -37,8 +38,10 @@
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.openrewrite.internal.RecipeIntrospectionUtils.dataTableDescriptorFromDataTable;

Expand Down Expand Up @@ -219,12 +222,14 @@ public <Row> void insertRow(DataTable<Row> dataTable, ExecutionContext ctx, Row
}

@Deprecated
@MustBeClosed
@Override
public Stream<?> getRows(String dataTableName, @Nullable String group) {
RowMetadata meta = rowMetadata.get(metaKey(dataTableName, group));
return readRows(dataTableName, group, meta);
}

@MustBeClosed
@SuppressWarnings("unchecked")
@Override
public <Row> Stream<Row> getRows(Class<? extends DataTable<Row>> dataTableClass, @Nullable String group) {
Expand All @@ -249,7 +254,6 @@ private <T> Stream<T> readRows(String dataTableName, @Nullable String group, @Nu
}
}

List<Object> allRows = new ArrayList<>();
//noinspection DataFlowIssue
File[] files = outputDir.toFile().listFiles((dir, name) -> name.endsWith(fileExtension));
if (files == null) {
Expand All @@ -263,54 +267,104 @@ private <T> Stream<T> readRows(String dataTableName, @Nullable String group, @Nu
activeWriterPaths.add(outputDir.resolve(entry.getKey() + fileExtension));
}

int prefixCount = prefixColumns.size();
int suffixCount = suffixColumns.size();

// Select the files belonging to this table by reading only their small comment
// header, then parse their rows lazily so a whole table is never held in memory.
List<Path> matchingFiles = new ArrayList<>();
for (File file : files) {
if (activeWriterPaths.contains(file.toPath())) {
continue;
}
try (InputStream is = inputStreamFactory.apply(file.toPath())) {
DataTableDescriptor descriptor = readDescriptor(is);
if (descriptor == null ||
!descriptor.getName().equals(dataTableName) ||
!Objects.equals(descriptor.getGroup(), group)) {
continue;
if (descriptor != null &&
descriptor.getName().equals(dataTableName) &&
Objects.equals(descriptor.getGroup(), group)) {
matchingFiles.add(file.toPath());
}
// readDescriptor consumed comment lines; now parse the remaining CSV
// (header + data rows). Re-read the full file with CsvParser.
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

try (InputStream is = inputStreamFactory.apply(file.toPath())) {
CsvParserSettings settings = new CsvParserSettings();
settings.setMaxCharsPerColumn(-1);
settings.setHeaderExtractionEnabled(true);
settings.getFormat().setComment('#');
CsvParser parser = new CsvParser(settings);
parser.beginParsing(new InputStreamReader(is, StandardCharsets.UTF_8));

String[] row;
while ((row = parser.parseNext()) != null) {
// Strip prefix and suffix columns, returning only data table columns
int dataCount = row.length - prefixCount - suffixCount;
String[] dataRow;
if (dataCount <= 0) {
dataRow = row;
} else {
dataRow = new String[dataCount];
System.arraycopy(row, prefixCount, dataRow, 0, dataCount);
RowSpliterator rows = new RowSpliterator(matchingFiles, meta);
return (Stream<T>) StreamSupport.stream(rows, false).onClose(rows::close);
}

/**
* Streams rows from the matching files one at a time, keeping a single file open as it
* goes. The file is closed as soon as its last row is read, so a fully drained stream
* (the way every caller consumes one) releases its handle without any explicit close.
* Closing the stream early (try-with-resources) also releases the open file.
*/
private final class RowSpliterator extends Spliterators.AbstractSpliterator<Object> {
private final Iterator<Path> paths;
private final @Nullable RowMetadata meta;
private final int prefixCount = prefixColumns.size();
private final int suffixCount = suffixColumns.size();

private @Nullable InputStream is;
private @Nullable CsvParser parser;

RowSpliterator(List<Path> matchingFiles, @Nullable RowMetadata meta) {
super(Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.NONNULL);
this.paths = matchingFiles.iterator();
this.meta = meta;
}

@Override
public boolean tryAdvance(Consumer<? super Object> action) {
while (true) {
if (parser == null) {
if (!paths.hasNext()) {
return false;
}
allRows.add(meta != null ? meta.toRow(dataRow) : dataRow);
open(paths.next());
}
parser.stopParsing();
} catch (IOException e) {
// Skip unreadable files
String[] row = parser.parseNext();
if (row == null) {
close();
continue;
}
// Strip prefix and suffix columns, returning only data table columns
int dataCount = row.length - prefixCount - suffixCount;
String[] dataRow;
if (dataCount <= 0) {
dataRow = row;
} else {
dataRow = new String[dataCount];
System.arraycopy(row, prefixCount, dataRow, 0, dataCount);
}
action.accept(meta != null ? meta.toRow(dataRow) : dataRow);
return true;
}
}

return (Stream<T>) allRows.stream();
// Assign the stream before creating the parser so a setup failure still leaves
// the open stream visible to close().
private void open(Path path) {
is = inputStreamFactory.apply(path);
CsvParserSettings settings = new CsvParserSettings();
settings.setMaxCharsPerColumn(-1);
settings.setHeaderExtractionEnabled(true);
settings.getFormat().setComment('#');
parser = new CsvParser(settings);
parser.beginParsing(new InputStreamReader(is, StandardCharsets.UTF_8));
}

void close() {
if (parser != null) {
parser.stopParsing();
parser = null;
}
if (is != null) {
try {
is.close();
} catch (IOException ignored) {
// best effort; the parser may have already closed it at end-of-input
}
is = null;
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.openrewrite;

import com.google.errorprone.annotations.MustBeClosed;
import org.jspecify.annotations.Nullable;

import java.util.Collection;
Expand All @@ -30,7 +31,7 @@
* </ul>
* <p>
* Each data table bucket is identified by the data table's class name and
* its {@link DataTable#getGroup() group} (for community tables) or

Check warning on line 34 in rewrite-core/src/main/java/org/openrewrite/DataTableStore.java

View workflow job for this annotation

GitHub Actions / build / build

reference not found: DataTable#getGroup()
* {@link DataTable#getInstanceName()} (for private tables).
*/
public interface DataTableStore {
Expand Down Expand Up @@ -63,6 +64,7 @@
* @deprecated Use {@link #getRows(Class)} or {@link #getRows(Class, String)} for type-safe deserialization.
*/
@Deprecated
@MustBeClosed
Stream<?> getRows(String dataTableName, @Nullable String group);

/**
Expand All @@ -74,6 +76,7 @@
* @param <Row> the row type
* @return a stream of typed rows, or an empty stream if no rows exist
*/
@MustBeClosed
@SuppressWarnings("unchecked")
default <Row> Stream<Row> getRows(Class<? extends DataTable<Row>> dataTableClass, @Nullable String group) {
return (Stream<Row>) getRows(dataTableClass.getName(), group);
Expand All @@ -86,6 +89,7 @@
* @param <Row> the row type
* @return a stream of typed rows, or an empty stream if no rows exist
*/
@MustBeClosed
default <Row> Stream<Row> getRows(Class<? extends DataTable<Row>> dataTableClass) {
return getRows(dataTableClass, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.openrewrite;

import com.google.errorprone.annotations.MustBeClosed;
import org.jspecify.annotations.Nullable;

import java.util.*;
Expand Down Expand Up @@ -58,6 +59,7 @@ public <Row> void insertRow(DataTable<Row> dataTable, ExecutionContext ctx, Row
bucket.rows.add(row);
}

@MustBeClosed
@Override
public Stream<?> getRows(String dataTableName, @Nullable String group) {
// Scan for matching bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.openrewrite;

import com.google.errorprone.annotations.MustBeClosed;
import org.jspecify.annotations.Nullable;

import java.util.Collection;
Expand All @@ -34,6 +35,7 @@ private NoOpDataTableStore() {
public <Row> void insertRow(DataTable<Row> dataTable, ExecutionContext ctx, Row row) {
}

@MustBeClosed
@Override
public Stream<?> getRows(String dataTableName, @Nullable String group) {
return Stream.empty();
Expand Down
11 changes: 7 additions & 4 deletions rewrite-core/src/main/java/org/openrewrite/RecipeRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Value
public class RecipeRun {
Expand Down Expand Up @@ -58,16 +59,18 @@ public <E> List<E> getDataTableRows(String name) {
@SuppressWarnings("unchecked")
@Deprecated
public <E> List<E> getDataTableRows(String name, @Nullable String group) {
return (List<E>) dataTableStore.getRows(name, group)
.collect(Collectors.toList());
try (Stream<?> rows = dataTableStore.getRows(name, group)) {
return (List<E>) rows.collect(Collectors.toList());
}
}

public <E> List<E> getDataTableRows(Class<? extends DataTable<E>> dataTableClass) {
return getDataTableRows(dataTableClass, null);
}

public <E> List<E> getDataTableRows(Class<? extends DataTable<E>> dataTableClass, @Nullable String group) {
return dataTableStore.getRows(dataTableClass, group)
.collect(Collectors.toList());
try (Stream<E> rows = dataTableStore.getRows(dataTableClass, group)) {
return rows.collect(Collectors.toList());
}
}
}
Loading
Loading