diff --git a/api/src/main/java/org/apache/iceberg/FileFormat.java b/api/src/main/java/org/apache/iceberg/FileFormat.java index 892475780bbf..41e997725609 100644 --- a/api/src/main/java/org/apache/iceberg/FileFormat.java +++ b/api/src/main/java/org/apache/iceberg/FileFormat.java @@ -27,6 +27,8 @@ public enum FileFormat { ORC("orc", true), PARQUET("parquet", true), AVRO("avro", true), + LANCE("lance", true), + NIMBLE("nimble", true), METADATA("metadata.json", false); private final String ext; diff --git a/build.gradle b/build.gradle index bc9c75c69f5d..cb145e540d5a 100644 --- a/build.gradle +++ b/build.gradle @@ -107,7 +107,7 @@ dependencyRecommendations { } def projectVersion = getProjectVersion() -final REVAPI_PROJECTS = ["iceberg-api", "iceberg-core", "iceberg-parquet", "iceberg-orc", "iceberg-common", "iceberg-data"] +final REVAPI_PROJECTS = ["iceberg-api", "iceberg-core", "iceberg-parquet", "iceberg-orc", "iceberg-lance", "iceberg-common", "iceberg-data"] allprojects { group = "org.apache.iceberg" @@ -357,6 +357,7 @@ project(':iceberg-data') { implementation project(':iceberg-core') compileOnly project(':iceberg-parquet') compileOnly project(':iceberg-orc') + compileOnly project(':iceberg-lance') compileOnly("org.apache.hadoop:hadoop-common") { exclude group: 'commons-beanutils' exclude group: 'org.apache.avro', module: 'avro' @@ -693,6 +694,40 @@ project(':iceberg-orc') { } } +project(':iceberg-lance') { + dependencies { + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + api project(':iceberg-api') + implementation project(':iceberg-common') + implementation project(':iceberg-core') + implementation("org.apache.avro:avro") { + exclude group: 'org.tukaani' // xz compression is not supported + } + +// implementation("org.apache.orc:orc-core::nohive") { +// exclude group: 'org.apache.hadoop' +// exclude group: 'commons-lang' +// // These artifacts are shaded and included in the orc-core fat jar +// exclude group: 'com.google.protobuf', module: 'protobuf-java' +// exclude group: 'org.apache.hive', module: 'hive-storage-api' +// } + + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'commons-beanutils' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } + compileOnly("org.apache.hadoop:hadoop-client") { + exclude group: 'org.apache.avro', module: 'avro' + } + + testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') + testImplementation project(':iceberg-common') +// testImplementation 'org.apache.orc:orc-tools' + } +} + project(':iceberg-parquet') { dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 976b98b0a9fe..c183c12cd8fc 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -148,6 +148,9 @@ public DataWriter newDataWriter( return orcBuilder.build(); + case LANCE: + // TODO LANCE-1: Implement Lance writer + default: throw new UnsupportedOperationException( "Unsupported data file format: " + dataFileFormat); diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 23a94ebc9944..253a7866e641 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -114,6 +114,9 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .overwrite() .build(); + case LANCE: + // TODO LANCE-1: Implement Lance writer + default: throw new UnsupportedOperationException( "Cannot write unknown file format: " + fileFormat); @@ -186,6 +189,8 @@ public EqualityDeleteWriter newEqDeleteWriter( .equalityFieldIds(equalityFieldIds) .buildEqualityWriter(); + case NIMBLE: + // TODO NIMBLE-1: Implement Nimble delete writer default: throw new UnsupportedOperationException( "Cannot write equality-deletes for unsupported file format: " + format); diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index 3637bf00bd58..a849dee61f0e 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -27,6 +27,7 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.lance.GenericLanceReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.expressions.Evaluator; @@ -37,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.lance.Lance; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -141,7 +143,16 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject .filter(task.residual()); return orc.build(); - + case LANCE: + Lance.ReadBuilder lance = + Lance.read(input) + .project(fileProjection) + .createReaderFunc( + fileSchema -> + GenericLanceReader.buildReader(fileProjection, fileSchema, partition)) + .split(task.start(), task.length()) + .filter(task.residual()); + return lance.build(); default: throw new UnsupportedOperationException( String.format( diff --git a/lance/src/main/java/org/apache/iceberg/data/lance/GenericLanceReader.java b/lance/src/main/java/org/apache/iceberg/data/lance/GenericLanceReader.java new file mode 100644 index 000000000000..0fae4050c880 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/data/lance/GenericLanceReader.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.lance; + +import java.util.Collections; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.lance.LanceRowReader; +import org.apache.iceberg.lance.LanceValueReader; +import org.apache.iceberg.lance.TypeDescription; +import org.apache.iceberg.lance.VectorizedRowBatch; + +public class GenericLanceReader implements LanceRowReader { + private final LanceValueReader reader; + + public GenericLanceReader( + Schema expectedSchema, TypeDescription readLanceSchema, Map idToConstant) { + // this.reader = + // LanceSchemaWithTypeVisitor.visit( + // expectedSchema, readLanceSchema, new ReadBuilder(idToConstant)); + this.reader = null; + } + + public static LanceRowReader buildReader( + Schema expectedSchema, TypeDescription fileSchema) { + return new GenericLanceReader(expectedSchema, fileSchema, Collections.emptyMap()); + } + + public static LanceRowReader buildReader( + Schema expectedSchema, TypeDescription fileSchema, Map idToConstant) { + return new GenericLanceReader(expectedSchema, fileSchema, idToConstant); + } + + @Override + public Record read(VectorizedRowBatch batch, int row) { + // return (Record) reader.read(new StructColumnVector(batch.size, batch.cols), row); + return null; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + reader.setBatchContext(batchOffsetInFile); + } + + // private static class ReadBuilder extends LanceSchemaWithTypeVisitor> { + // private final Map idToConstant; + // + // private ReadBuilder(Map idToConstant) { + // this.idToConstant = idToConstant; + // } + // + // @Override + // public LanceValueReader record( + // Types.StructType expected, + // TypeDescription record, + // List names, + // List> fields) { + // return GenericLanceReaders.struct(fields, expected, idToConstant); + // } + // + // @Override + // public LanceValueReader list( + // Types.ListType iList, TypeDescription array, LanceValueReader elementReader) { + // return GenericLanceReaders.array(elementReader); + // } + // + // @Override + // public LanceValueReader map( + // Types.MapType iMap, + // TypeDescription map, + // LanceValueReader keyReader, + // LanceValueReader valueReader) { + // return GenericLanceReaders.map(keyReader, valueReader); + // } + // + // @Override + // public LanceValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescription + // primitive) { + // switch (primitive.getCategory()) { + // case BOOLEAN: + // return LanceValueReaders.booleans(); + // case BYTE: + // // Iceberg does not have a byte type. Use int + // case SHORT: + // // Iceberg does not have a short type. Use int + // case INT: + // return LanceValueReaders.ints(); + // case LONG: + // switch (iPrimitive.typeId()) { + // case TIME: + // return GenericLanceReaders.times(); + // case LONG: + // return LanceValueReaders.longs(); + // default: + // throw new IllegalStateException( + // String.format( + // "Invalid iceberg type %s corresponding to ORC type %s", + // iPrimitive, primitive)); + // } + // + // case FLOAT: + // return LanceValueReaders.floats(); + // case DOUBLE: + // return LanceValueReaders.doubles(); + // case DATE: + // return GenericLanceReaders.dates(); + // case TIMESTAMP: + // return GenericLanceReaders.timestamps(); + // case TIMESTAMP_INSTANT: + // return GenericLanceReaders.timestampTzs(); + // case DECIMAL: + // return GenericLanceReaders.decimals(); + // case CHAR: + // case VARCHAR: + // case STRING: + // return GenericLanceReaders.strings(); + // case BINARY: + // switch (iPrimitive.typeId()) { + // case UUID: + // return GenericLanceReaders.uuids(); + // case FIXED: + // return LanceValueReaders.bytes(); + // case BINARY: + // return GenericLanceReaders.bytes(); + // default: + // throw new IllegalStateException( + // String.format( + // "Invalid iceberg type %s corresponding to ORC type %s", + // iPrimitive, primitive)); + // } + // default: + // throw new IllegalArgumentException("Unhandled type " + primitive); + // } + // } + // } +} diff --git a/lance/src/main/java/org/apache/iceberg/data/lance/GenericLanceReaders.java b/lance/src/main/java/org/apache/iceberg/data/lance/GenericLanceReaders.java new file mode 100644 index 000000000000..1c7521d3ee86 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/data/lance/GenericLanceReaders.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.lance; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.lance.ColumnVector; +import org.apache.iceberg.lance.LanceValueReader; +import org.apache.iceberg.lance.LanceValueReaders; +import org.apache.iceberg.types.Types; + +public class GenericLanceReaders { + private GenericLanceReaders() {} + + public static LanceValueReader struct( + List> readers, Types.StructType struct, Map idToConstant) { + return new StructReader(readers, struct, idToConstant); + } + + public static LanceValueReader> array(LanceValueReader elementReader) { + return new ListReader(elementReader); + } + + public static LanceValueReader> map( + LanceValueReader keyReader, LanceValueReader valueReader) { + return new MapReader(keyReader, valueReader); + } + + public static LanceValueReader timestampTzs() { + return TimestampTzReader.INSTANCE; + } + + public static LanceValueReader decimals() { + return DecimalReader.INSTANCE; + } + + public static LanceValueReader strings() { + return StringReader.INSTANCE; + } + + public static LanceValueReader uuids() { + return UUIDReader.INSTANCE; + } + + public static LanceValueReader bytes() { + return BytesReader.INSTANCE; + } + + public static LanceValueReader times() { + return TimeReader.INSTANCE; + } + + public static LanceValueReader dates() { + return DateReader.INSTANCE; + } + + public static LanceValueReader timestamps() { + return TimestampReader.INSTANCE; + } + + private static class TimestampTzReader implements LanceValueReader { + public static final LanceValueReader INSTANCE = new TimestampTzReader(); + + private TimestampTzReader() {} + + @Override + public OffsetDateTime nonNullRead(ColumnVector vector, int row) { + // TimestampColumnVector tcv = (TimestampColumnVector) vector; + // return Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000), tcv.nanos[row]) + // .atOffset(ZoneOffset.UTC); + return null; + } + } + + private static class TimeReader implements LanceValueReader { + public static final LanceValueReader INSTANCE = new TimeReader(); + + private TimeReader() {} + + @Override + public LocalTime nonNullRead(ColumnVector vector, int row) { + // return DateTimeUtil.timeFromMicros(((LongColumnVector) vector).vector[row]); + return null; + } + } + + private static class DateReader implements LanceValueReader { + public static final LanceValueReader INSTANCE = new DateReader(); + + private DateReader() {} + + @Override + public LocalDate nonNullRead(ColumnVector vector, int row) { + // return DateTimeUtil.dateFromDays((int) ((LongColumnVector) vector).vector[row]); + return null; + } + } + + private static class TimestampReader implements LanceValueReader { + public static final LanceValueReader INSTANCE = new TimestampReader(); + + private TimestampReader() {} + + @Override + public LocalDateTime nonNullRead(ColumnVector vector, int row) { + // TimestampColumnVector tcv = (TimestampColumnVector) vector; + // return Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000), tcv.nanos[row]) + // .atOffset(ZoneOffset.UTC) + // .toLocalDateTime(); + return null; + } + } + + private static class DecimalReader implements LanceValueReader { + public static final LanceValueReader INSTANCE = new DecimalReader(); + + private DecimalReader() {} + + @Override + public BigDecimal nonNullRead(ColumnVector vector, int row) { + // DecimalColumnVector cv = (DecimalColumnVector) vector; + // return cv.vector[row].getHiveDecimal().bigDecimalValue().setScale(cv.scale); + return null; + } + } + + private static class StringReader implements LanceValueReader { + public static final LanceValueReader INSTANCE = new StringReader(); + + private StringReader() {} + + @Override + public String nonNullRead(ColumnVector vector, int row) { + // BytesColumnVector bytesVector = (BytesColumnVector) vector; + // return new String( + // bytesVector.vector[row], + // bytesVector.start[row], + // bytesVector.length[row], + // StandardCharsets.UTF_8); + return null; + } + } + + private static class UUIDReader implements LanceValueReader { + public static final LanceValueReader INSTANCE = new UUIDReader(); + + private UUIDReader() {} + + @Override + public UUID nonNullRead(ColumnVector vector, int row) { + // BytesColumnVector bytesVector = (BytesColumnVector) vector; + // ByteBuffer buf = + // ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], + // bytesVector.length[row]); + // return UUIDUtil.convert(buf); + return null; + } + } + + private static class BytesReader implements LanceValueReader { + public static final LanceValueReader INSTANCE = new BytesReader(); + + private BytesReader() {} + + @Override + public ByteBuffer nonNullRead(ColumnVector vector, int row) { + // BytesColumnVector bytesVector = (BytesColumnVector) vector; + // return ByteBuffer.wrap( + // bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); + return null; + } + } + + private static class StructReader extends LanceValueReaders.StructReader { + private final GenericRecord template; + + protected StructReader( + List> readers, + Types.StructType structType, + Map idToConstant) { + super(readers, structType, idToConstant); + this.template = structType != null ? GenericRecord.create(structType) : null; + } + + @Override + protected Record create() { + // GenericRecord.copy() is more performant then GenericRecord.create(StructType) since + // NAME_MAP_CACHE access + // is eliminated. Using copy here to gain performance. + return template.copy(); + } + + @Override + protected void set(Record struct, int pos, Object value) { + struct.set(pos, value); + } + } + + private static class MapReader implements LanceValueReader> { + private final LanceValueReader keyReader; + private final LanceValueReader valueReader; + + private MapReader(LanceValueReader keyReader, LanceValueReader valueReader) { + this.keyReader = keyReader; + this.valueReader = valueReader; + } + + @Override + public Map nonNullRead(ColumnVector vector, int row) { + // MapColumnVector mapVector = (MapColumnVector) vector; + // int offset = (int) mapVector.offsets[row]; + // long length = mapVector.lengths[row]; + // Map map = Maps.newHashMapWithExpectedSize((int) length); + // for (int c = 0; c < length; c++) { + // map.put( + // keyReader.read(mapVector.keys, offset + c), + // valueReader.read(mapVector.values, offset + c)); + // } + // return map; + return null; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + keyReader.setBatchContext(batchOffsetInFile); + valueReader.setBatchContext(batchOffsetInFile); + } + } + + private static class ListReader implements LanceValueReader> { + private final LanceValueReader elementReader; + + private ListReader(LanceValueReader elementReader) { + this.elementReader = elementReader; + } + + @Override + public List nonNullRead(ColumnVector vector, int row) { + // ListColumnVector listVector = (ListColumnVector) vector; + // int offset = (int) listVector.offsets[row]; + // int length = (int) listVector.lengths[row]; + // List elements = Lists.newArrayListWithExpectedSize(length); + // for (int c = 0; c < length; ++c) { + // elements.add(elementReader.read(listVector.child, offset + c)); + // } + // return elements; + return null; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + elementReader.setBatchContext(batchOffsetInFile); + } + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/ColumnVector.java b/lance/src/main/java/org/apache/iceberg/lance/ColumnVector.java new file mode 100644 index 000000000000..5b0825f08e95 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/ColumnVector.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +public class ColumnVector {} diff --git a/lance/src/main/java/org/apache/iceberg/lance/DoubleColumnVector.java b/lance/src/main/java/org/apache/iceberg/lance/DoubleColumnVector.java new file mode 100644 index 000000000000..ca2c522ec81d --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/DoubleColumnVector.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +public class DoubleColumnVector extends ColumnVector {} diff --git a/lance/src/main/java/org/apache/iceberg/lance/Lance.java b/lance/src/main/java/org/apache/iceberg/lance/Lance.java new file mode 100644 index 000000000000..87be6ed66297 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/Lance.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class Lance { + private Lance() {} + + public static ReadBuilder read(InputFile file) { + return new ReadBuilder(file); + } + + public static class ReadBuilder { + private final InputFile file; + private final Configuration conf; + private Schema schema = null; + private Long start = null; + private Long length = null; + private Expression filter = null; + private boolean caseSensitive = true; + private NameMapping nameMapping = null; + + private Function> readerFunc; + // private Function> batchedReaderFunc; + // private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE; + + private ReadBuilder(InputFile file) { + // Preconditions.checkNotNull(file, "Input file cannot be null"); + this.file = file; + if (file instanceof HadoopInputFile) { + this.conf = new Configuration(((HadoopInputFile) file).getConf()); + } else { + this.conf = new Configuration(); + } + } + + public ReadBuilder split(long newStart, long newLength) { + this.start = newStart; + this.length = newLength; + return this; + } + + public ReadBuilder project(Schema newSchema) { + this.schema = newSchema; + return this; + } + + public ReadBuilder caseSensitive(boolean newCaseSensitive) { + // OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(this.conf, newCaseSensitive); + this.caseSensitive = newCaseSensitive; + return this; + } + + public ReadBuilder config(String property, String value) { + conf.set(property, value); + return this; + } + + public ReadBuilder createReaderFunc( + Function> readerFunction) { + // Preconditions.checkArgument( + // this.batchedReaderFunc == null, + // "Reader function cannot be set since the batched version is already set"); + this.readerFunc = readerFunction; + return this; + } + + public ReadBuilder filter(Expression newFilter) { + this.filter = newFilter; + return this; + } + + // public ReadBuilder createBatchedReaderFunc( + // Function> batchReaderFunction) { + // Preconditions.checkArgument( + // this.readerFunc == null, + // "Batched reader function cannot be set since the non-batched version is already + // set"); + // this.batchedReaderFunc = batchReaderFunction; + // return this; + // } + + // public ReadBuilder recordsPerBatch(int numRecordsPerBatch) { + // this.recordsPerBatch = numRecordsPerBatch; + // return this; + // } + + public ReadBuilder withNameMapping(NameMapping newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + + public CloseableIterable build() { + Preconditions.checkNotNull(schema, "Schema is required"); + return new LanceIterable<>(); + // file, + // conf, + // schema, + // nameMapping, + // start, + // length, + // readerFunc, + // caseSensitive, + // filter, + // batchedReaderFunc, + // recordsPerBatch); + } + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/LanceIterable.java b/lance/src/main/java/org/apache/iceberg/lance/LanceIterable.java new file mode 100644 index 000000000000..88d81073868b --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/LanceIterable.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.util.Spliterator; +import java.util.function.Consumer; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; + +class LanceIterable extends CloseableGroup implements CloseableIterable { + + @Override + public CloseableIterator iterator() { + return null; + } + + @Override + public void forEach(Consumer action) { + CloseableIterable.super.forEach(action); + } + + @Override + public Spliterator spliterator() { + return CloseableIterable.super.spliterator(); + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/LanceRowReader.java b/lance/src/main/java/org/apache/iceberg/lance/LanceRowReader.java new file mode 100644 index 000000000000..22c50a1c0dc8 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/LanceRowReader.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +/** Used for implementing Lance row readers. */ +public interface LanceRowReader { + + /** Reads a row. */ + T read(VectorizedRowBatch batch, int row); + + void setBatchContext(long batchOffsetInFile); +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/LanceValueReader.java b/lance/src/main/java/org/apache/iceberg/lance/LanceValueReader.java new file mode 100644 index 000000000000..f590364b0206 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/LanceValueReader.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +public interface LanceValueReader { + default T read(ColumnVector vector, int row) { + // int rowIndex = vector.isRepeating ? 0 : row; + // if (!vector.noNulls && vector.isNull[rowIndex]) { + // return null; + // } else { + // return nonNullRead(vector, rowIndex); + // } + return null; + } + + T nonNullRead(ColumnVector vector, int row); + + default void setBatchContext(long batchOffsetInFile) {} +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/LanceValueReaders.java b/lance/src/main/java/org/apache/iceberg/lance/LanceValueReaders.java new file mode 100644 index 000000000000..1ba303c3c981 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/LanceValueReaders.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.types.Types; + +public class LanceValueReaders { + private LanceValueReaders() {} + + public static LanceValueReader booleans() { + return BooleanReader.INSTANCE; + } + + public static LanceValueReader ints() { + return IntegerReader.INSTANCE; + } + + public static LanceValueReader longs() { + return LongReader.INSTANCE; + } + + public static LanceValueReader floats() { + return FloatReader.INSTANCE; + } + + public static LanceValueReader doubles() { + return DoubleReader.INSTANCE; + } + + public static LanceValueReader bytes() { + return BytesReader.INSTANCE; + } + + public static LanceValueReader constants(C constant) { + return new ConstantReader<>(constant); + } + + private static class BooleanReader implements LanceValueReader { + static final BooleanReader INSTANCE = new BooleanReader(); + + private BooleanReader() {} + + @Override + public Boolean nonNullRead(ColumnVector vector, int row) { + // return ((LongColumnVector) vector).vector[row] != 0; + return true; + } + } + + private static class IntegerReader implements LanceValueReader { + static final IntegerReader INSTANCE = new IntegerReader(); + + private IntegerReader() {} + + @Override + public Integer nonNullRead(ColumnVector vector, int row) { + // return (int) ((LongColumnVector) vector).vector[row]; + return 0; + } + } + + private static class LongReader implements LanceValueReader { + static final LongReader INSTANCE = new LongReader(); + + private LongReader() {} + + @Override + public Long nonNullRead(ColumnVector vector, int row) { + // return ((LongColumnVector) vector).vector[row]; + return 0L; + } + } + + private static class FloatReader implements LanceValueReader { + private static final FloatReader INSTANCE = new FloatReader(); + + private FloatReader() {} + + @Override + public Float nonNullRead(ColumnVector vector, int row) { + // return (float) ((DoubleColumnVector) vector).vector[row]; + return 0f; + } + } + + private static class DoubleReader implements LanceValueReader { + private static final DoubleReader INSTANCE = new DoubleReader(); + + private DoubleReader() {} + + @Override + public Double nonNullRead(ColumnVector vector, int row) { + // return ((DoubleColumnVector) vector).vector[row]; + return 0d; + } + } + + private static class BytesReader implements LanceValueReader { + private static final BytesReader INSTANCE = new BytesReader(); + + private BytesReader() {} + + @Override + public byte[] nonNullRead(ColumnVector vector, int row) { + // BytesColumnVector bytesVector = (BytesColumnVector) vector; + // + // return Arrays.copyOfRange( + // bytesVector.vector[row], + // bytesVector.start[row], + // bytesVector.start[row] + bytesVector.length[row]); + return new byte[] {}; + } + } + + public abstract static class StructReader implements LanceValueReader { + private final LanceValueReader[] readers; + private final boolean[] isConstantOrMetadataField; + + protected StructReader( + List> readers, Types.StructType struct, Map idToConstant) { + List fields = struct.fields(); + this.readers = new LanceValueReader[fields.size()]; + this.isConstantOrMetadataField = new boolean[fields.size()]; + for (int pos = 0, readerIndex = 0; pos < fields.size(); pos += 1) { + Types.NestedField field = fields.get(pos); + if (idToConstant.containsKey(field.fieldId())) { + this.isConstantOrMetadataField[pos] = true; + this.readers[pos] = constants(idToConstant.get(field.fieldId())); + } else if (field.equals(MetadataColumns.ROW_POSITION)) { + this.isConstantOrMetadataField[pos] = true; + this.readers[pos] = new RowPositionReader(); + } else if (field.equals(MetadataColumns.IS_DELETED)) { + this.isConstantOrMetadataField[pos] = true; + this.readers[pos] = constants(false); + } else if (MetadataColumns.isMetadataColumn(field.name())) { + // in case of any other metadata field, fill with nulls + this.isConstantOrMetadataField[pos] = true; + this.readers[pos] = constants(null); + } else { + this.readers[pos] = readers.get(readerIndex++); + } + } + } + + protected abstract T create(); + + protected abstract void set(T struct, int pos, Object value); + + public LanceValueReader reader(int pos) { + return readers[pos]; + } + + @Override + public T nonNullRead(ColumnVector vector, int row) { + // StructColumnVector structVector = (StructColumnVector) vector; + // return readInternal(create(), structVector.fields, row); + return null; + } + + private T readInternal(T struct, ColumnVector[] columnVectors, int row) { + for (int c = 0, vectorIndex = 0; c < readers.length; ++c) { + ColumnVector vector; + if (isConstantOrMetadataField[c]) { + vector = null; + } else { + vector = columnVectors[vectorIndex]; + vectorIndex++; + } + set(struct, c, reader(c).read(vector, row)); + } + return struct; + } + + @Override + public void setBatchContext(long batchOffsetInFile) { + for (LanceValueReader reader : readers) { + reader.setBatchContext(batchOffsetInFile); + } + } + } + + private static class ConstantReader implements LanceValueReader { + private final C constant; + + private ConstantReader(C constant) { + this.constant = constant; + } + + @Override + public C read(ColumnVector ignored, int ignoredRow) { + return constant; + } + + @Override + public C nonNullRead(ColumnVector ignored, int ignoredRow) { + return constant; + } + } + + private static class RowPositionReader implements LanceValueReader { + private long batchOffsetInFile; + + @Override + public Long read(ColumnVector ignored, int row) { + return batchOffsetInFile + row; + } + + @Override + public Long nonNullRead(ColumnVector ignored, int row) { + throw new UnsupportedOperationException("Use RowPositionReader.read()"); + } + + @Override + public void setBatchContext(long newBatchOffsetInFile) { + this.batchOffsetInFile = newBatchOffsetInFile; + } + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/TypeDescription.java b/lance/src/main/java/org/apache/iceberg/lance/TypeDescription.java new file mode 100644 index 000000000000..7768b77f3c82 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/TypeDescription.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.io.Serializable; + +public class TypeDescription implements Comparable, Serializable, Cloneable { + @Override + public int compareTo(TypeDescription o) { + return 0; + } + + public enum Category { + BOOLEAN("boolean", true), + BYTE("tinyint", true), + SHORT("smallint", true), + INT("int", true), + LONG("bigint", true), + FLOAT("float", true), + DOUBLE("double", true), + STRING("string", true), + DATE("date", true), + TIMESTAMP("timestamp", true), + BINARY("binary", true), + DECIMAL("decimal", true), + VARCHAR("varchar", true), + CHAR("char", true), + LIST("array", false), + MAP("map", false), + STRUCT("struct", false), + UNION("uniontype", false), + TIMESTAMP_INSTANT("timestamp with local time zone", true); + + private final boolean isPrimitive; + private final String name; + + Category(String name, boolean isPrimitive) { + this.name = name; + this.isPrimitive = isPrimitive; + } + + public boolean isPrimitive() { + return this.isPrimitive; + } + + public String getName() { + return this.name; + } + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/VectorizedRowBatch.java b/lance/src/main/java/org/apache/iceberg/lance/VectorizedRowBatch.java new file mode 100644 index 000000000000..473ce0bc21f2 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/VectorizedRowBatch.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +public class VectorizedRowBatch {} diff --git a/settings.gradle b/settings.gradle index ea528cf4427e..f9d807096c14 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,6 +25,7 @@ include 'data' include 'aliyun' include 'aws' include 'orc' +include 'lance' include 'arrow' include 'parquet' include 'bundled-guava' @@ -44,6 +45,7 @@ project(':data').name = 'iceberg-data' project(':aliyun').name = 'iceberg-aliyun' project(':aws').name = 'iceberg-aws' project(':orc').name = 'iceberg-orc' +project(':lance').name = 'iceberg-lance' project(':arrow').name = 'iceberg-arrow' project(':parquet').name = 'iceberg-parquet' project(':bundled-guava').name = 'iceberg-bundled-guava'