Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/src/main/java/org/apache/iceberg/FileFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public enum FileFormat {
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
LANCE("lance", true),
NIMBLE("nimble", true),
METADATA("metadata.json", false);

private final String ext;
Expand Down
37 changes: 36 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ dependencyRecommendations {
}

def projectVersion = getProjectVersion()
final REVAPI_PROJECTS = ["iceberg-api", "iceberg-core", "iceberg-parquet", "iceberg-orc", "iceberg-common", "iceberg-data"]
final REVAPI_PROJECTS = ["iceberg-api", "iceberg-core", "iceberg-parquet", "iceberg-orc", "iceberg-lance", "iceberg-common", "iceberg-data"]

allprojects {
group = "org.apache.iceberg"
Expand Down Expand Up @@ -357,6 +357,7 @@ project(':iceberg-data') {
implementation project(':iceberg-core')
compileOnly project(':iceberg-parquet')
compileOnly project(':iceberg-orc')
compileOnly project(':iceberg-lance')
compileOnly("org.apache.hadoop:hadoop-common") {
exclude group: 'commons-beanutils'
exclude group: 'org.apache.avro', module: 'avro'
Expand Down Expand Up @@ -693,6 +694,40 @@ project(':iceberg-orc') {
}
}

project(':iceberg-lance') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation("org.apache.avro:avro") {
exclude group: 'org.tukaani' // xz compression is not supported
}

// implementation("org.apache.orc:orc-core::nohive") {
// exclude group: 'org.apache.hadoop'
// exclude group: 'commons-lang'
// // These artifacts are shaded and included in the orc-core fat jar
// exclude group: 'com.google.protobuf', module: 'protobuf-java'
// exclude group: 'org.apache.hive', module: 'hive-storage-api'
// }

compileOnly("org.apache.hadoop:hadoop-common") {
exclude group: 'commons-beanutils'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
compileOnly("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
}

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(':iceberg-common')
// testImplementation 'org.apache.orc:orc-tools'
}
}

project(':iceberg-parquet') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ public DataWriter<T> newDataWriter(

return orcBuilder.build();

case LANCE:
// TODO LANCE-1: Implement Lance writer

default:
throw new UnsupportedOperationException(
"Unsupported data file format: " + dataFileFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
.overwrite()
.build();

case LANCE:
// TODO LANCE-1: Implement Lance writer

default:
throw new UnsupportedOperationException(
"Cannot write unknown file format: " + fileFormat);
Expand Down Expand Up @@ -186,6 +189,8 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

case NIMBLE:
// TODO NIMBLE-1: Implement Nimble delete writer
default:
throw new UnsupportedOperationException(
"Cannot write equality-deletes for unsupported file format: " + format);
Expand Down
13 changes: 12 additions & 1 deletion data/src/main/java/org/apache/iceberg/data/GenericReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.lance.GenericLanceReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.expressions.Evaluator;
Expand All @@ -37,6 +38,7 @@
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.lance.Lance;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -141,7 +143,16 @@ private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProject
.filter(task.residual());

return orc.build();

case LANCE:
Lance.ReadBuilder lance =
Lance.read(input)
.project(fileProjection)
.createReaderFunc(
fileSchema ->
GenericLanceReader.buildReader(fileProjection, fileSchema, partition))
.split(task.start(), task.length())
.filter(task.residual());
return lance.build();
default:
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.lance;

import java.util.Collections;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.lance.LanceRowReader;
import org.apache.iceberg.lance.LanceValueReader;
import org.apache.iceberg.lance.TypeDescription;
import org.apache.iceberg.lance.VectorizedRowBatch;

public class GenericLanceReader implements LanceRowReader<Record> {
private final LanceValueReader<?> reader;

public GenericLanceReader(
Schema expectedSchema, TypeDescription readLanceSchema, Map<Integer, ?> idToConstant) {
// this.reader =
// LanceSchemaWithTypeVisitor.visit(
// expectedSchema, readLanceSchema, new ReadBuilder(idToConstant));
this.reader = null;
}

public static LanceRowReader<Record> buildReader(
Schema expectedSchema, TypeDescription fileSchema) {
return new GenericLanceReader(expectedSchema, fileSchema, Collections.emptyMap());
}

public static LanceRowReader<Record> buildReader(
Schema expectedSchema, TypeDescription fileSchema, Map<Integer, ?> idToConstant) {
return new GenericLanceReader(expectedSchema, fileSchema, idToConstant);
}

@Override
public Record read(VectorizedRowBatch batch, int row) {
// return (Record) reader.read(new StructColumnVector(batch.size, batch.cols), row);
return null;
}

@Override
public void setBatchContext(long batchOffsetInFile) {
reader.setBatchContext(batchOffsetInFile);
}

// private static class ReadBuilder extends LanceSchemaWithTypeVisitor<LanceValueReader<?>> {
// private final Map<Integer, ?> idToConstant;
//
// private ReadBuilder(Map<Integer, ?> idToConstant) {
// this.idToConstant = idToConstant;
// }
//
// @Override
// public LanceValueReader<?> record(
// Types.StructType expected,
// TypeDescription record,
// List<String> names,
// List<LanceValueReader<?>> fields) {
// return GenericLanceReaders.struct(fields, expected, idToConstant);
// }
//
// @Override
// public LanceValueReader<?> list(
// Types.ListType iList, TypeDescription array, LanceValueReader<?> elementReader) {
// return GenericLanceReaders.array(elementReader);
// }
//
// @Override
// public LanceValueReader<?> map(
// Types.MapType iMap,
// TypeDescription map,
// LanceValueReader<?> keyReader,
// LanceValueReader<?> valueReader) {
// return GenericLanceReaders.map(keyReader, valueReader);
// }
//
// @Override
// public LanceValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescription
// primitive) {
// switch (primitive.getCategory()) {
// case BOOLEAN:
// return LanceValueReaders.booleans();
// case BYTE:
// // Iceberg does not have a byte type. Use int
// case SHORT:
// // Iceberg does not have a short type. Use int
// case INT:
// return LanceValueReaders.ints();
// case LONG:
// switch (iPrimitive.typeId()) {
// case TIME:
// return GenericLanceReaders.times();
// case LONG:
// return LanceValueReaders.longs();
// default:
// throw new IllegalStateException(
// String.format(
// "Invalid iceberg type %s corresponding to ORC type %s",
// iPrimitive, primitive));
// }
//
// case FLOAT:
// return LanceValueReaders.floats();
// case DOUBLE:
// return LanceValueReaders.doubles();
// case DATE:
// return GenericLanceReaders.dates();
// case TIMESTAMP:
// return GenericLanceReaders.timestamps();
// case TIMESTAMP_INSTANT:
// return GenericLanceReaders.timestampTzs();
// case DECIMAL:
// return GenericLanceReaders.decimals();
// case CHAR:
// case VARCHAR:
// case STRING:
// return GenericLanceReaders.strings();
// case BINARY:
// switch (iPrimitive.typeId()) {
// case UUID:
// return GenericLanceReaders.uuids();
// case FIXED:
// return LanceValueReaders.bytes();
// case BINARY:
// return GenericLanceReaders.bytes();
// default:
// throw new IllegalStateException(
// String.format(
// "Invalid iceberg type %s corresponding to ORC type %s",
// iPrimitive, primitive));
// }
// default:
// throw new IllegalArgumentException("Unhandled type " + primitive);
// }
// }
// }
}
Loading