From f35d1f4fe3ef089ad675c31edd177169e496d42a Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 15:17:28 -0800 Subject: [PATCH 01/14] Add table changes constructor --- .../data/table-with-cdf/_delta_log/00000000000000000002.json | 1 + 1 file changed, 1 insertion(+) diff --git a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json index a85f485a1e..eef316c843 100644 --- a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json +++ b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json @@ -1,4 +1,5 @@ {"commitInfo":{"timestamp":1704392846603,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"6cef7579-ca93-4427-988e-9269e8db50c7"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7, "readerFeatures":[], "writerFeatures":[]}} {"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "false"}}} {"remove":{"path":"fake/path/1","deletionTimestamp":1704392846603,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":635,"tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}} {"txn":{"appId":"fakeAppId","version":3,"lastUpdated":200}} From 356622079afb9f900c8c81c2ebc3fb34e654a6ef Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 15:30:58 -0800 Subject: [PATCH 02/14] Fix docs --- kernel/src/table_changes/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index af0096c9b8..75fda08c27 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -70,6 +70,8 @@ impl TableChanges { /// Note that this does not check that change data feed is enabled for every commit in the /// range. It also does not check that the schema remains the same for the entire range. /// + /// Note that this does not check that change data feed is enabled for every commit in the + /// range. It also does not check that the schema remains the same for the entire range. /// # Parameters /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) /// - `engine`: Implementation of [`Engine`] apis. From 63aba2f0b2fbd2a0a476495cff1e256068d0c436 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 16:10:50 -0800 Subject: [PATCH 03/14] Add test for schema, asserting it is equal --- kernel/src/table_changes/mod.rs | 7 +++++-- .../table-with-cdf/_delta_log/00000000000000000002.json | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 75fda08c27..26f61ff68c 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -70,8 +70,6 @@ impl TableChanges { /// Note that this does not check that change data feed is enabled for every commit in the /// range. It also does not check that the schema remains the same for the entire range. /// - /// Note that this does not check that change data feed is enabled for every commit in the - /// range. It also does not check that the schema remains the same for the entire range. /// # Parameters /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) /// - `engine`: Implementation of [`Engine`] apis. @@ -117,6 +115,11 @@ impl TableChanges { "Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(), ))); } + if start_snapshot.schema() != end_snapshot.schema() { + return Err(Error::generic( + "Failed to build TableChanges: Start and end version schemas are different.", + )); + } let log_root = table_root.join("_delta_log/")?; let log_segment = LogSegment::for_table_changes( diff --git a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json index eef316c843..a85f485a1e 100644 --- a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json +++ b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json @@ -1,5 +1,4 @@ {"commitInfo":{"timestamp":1704392846603,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"6cef7579-ca93-4427-988e-9269e8db50c7"}} -{"protocol":{"minReaderVersion":3,"minWriterVersion":7, "readerFeatures":[], "writerFeatures":[]}} {"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "false"}}} {"remove":{"path":"fake/path/1","deletionTimestamp":1704392846603,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":635,"tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}} {"txn":{"appId":"fakeAppId","version":3,"lastUpdated":200}} From c5c79e68c2d325b490eac6433da4d610faeb1d63 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 17:44:55 -0800 Subject: [PATCH 04/14] Add table changes schema --- kernel/src/table_changes/mod.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 26f61ff68c..348d23565e 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -19,7 +19,7 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { }); /// Represents a call to read the Change Data Feed (CDF) between two versions of a table. The schema of -/// `TableChanges` will be the schema of the table at the end verios with three additional columns: +/// `TableChanges` will be the schema of the table at the end version with three additional columns: /// - `_change_type`: String representing the type of change that for that commit. This may be one /// of `delete`, `insert`, `update_preimage`, or `update_postimage`. /// - `_commit_version`: Long representing the commit the change occurred in. @@ -179,9 +179,17 @@ impl TableChanges { #[cfg(test)] mod tests { use crate::engine::sync::SyncEngine; +<<<<<<< HEAD use crate::schema::{DataType, StructField}; use crate::table_changes::CDF_FIELDS; use crate::{Error, Table}; +======= + use crate::schema::DataType; + use crate::schema::StructField; + use crate::table_changes::CDF_FIELDS; + use crate::Error; + use crate::Table; +>>>>>>> 073ae49 (Add table changes schema) use itertools::assert_equal; #[test] @@ -233,4 +241,20 @@ mod tests { let table_changes = table.table_changes(engine.as_ref(), 0, 0).unwrap(); assert_equal(expected_schema, table_changes.schema().fields().cloned()); } + + #[test] + fn test_table_changes_has_cdf_schema() { + let path = "./tests/data/table-with-cdf"; + let engine = Box::new(SyncEngine::new()); + let table = Table::try_from_uri(path).unwrap(); + let expected_schema = [ + StructField::new("part", DataType::INTEGER, true), + StructField::new("id", DataType::INTEGER, true), + ] + .into_iter() + .chain(CDF_FIELDS.clone()); + + let table_changes = table.table_changes(engine.as_ref(), 0, 0).unwrap(); + assert_equal(expected_schema, table_changes.schema().fields().cloned()); + } } From fe8cea594f25a0950715393b27d01f7409395e6e Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 09:49:10 -0800 Subject: [PATCH 05/14] add schema to error message --- kernel/src/table_changes/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 348d23565e..dabfba6116 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -116,9 +116,9 @@ impl TableChanges { ))); } if start_snapshot.schema() != end_snapshot.schema() { - return Err(Error::generic( - "Failed to build TableChanges: Start and end version schemas are different.", - )); + return Err(Error::generic(format!( + "Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(), + ))); } let log_root = table_root.join("_delta_log/")?; From ef9ffa059492e287f80fb49eca05fc4d4bae4be8 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 14:24:17 -0800 Subject: [PATCH 06/14] Add tablechangesscan --- kernel/src/scan/mod.rs | 2 + kernel/src/table_changes/mod.rs | 23 ++-- kernel/src/table_changes/scan.rs | 204 +++++++++++++++++++++++++++++++ 3 files changed, 220 insertions(+), 9 deletions(-) create mode 100644 kernel/src/table_changes/scan.rs diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index a5fa669f11..ef91b054c2 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -159,6 +159,8 @@ impl ScanResult { /// store the name of the column, as that's all that's needed during the actual query. For /// `Partition` we store an index into the logical schema for this query since later we need the /// data type as well to materialize the partition column. +#[allow(unused)] +#[derive(Debug, PartialEq, Eq)] pub enum ColumnType { // A column, selected from the data, as is Selected(String), diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index dabfba6116..a995ca6db2 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -1,6 +1,7 @@ //! Provides an API to read the table's change data feed between two versions. -use std::sync::LazyLock; +use std::sync::{Arc, LazyLock}; +use scan::TableChangesScanBuilder; use url::Url; use crate::log_segment::LogSegment; @@ -10,6 +11,8 @@ use crate::snapshot::Snapshot; use crate::table_features::ColumnMappingMode; use crate::{DeltaResult, Engine, Error, Version}; +pub mod scan; + static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { [ StructField::new("_change_type", DataType::STRING, false), @@ -174,22 +177,24 @@ impl TableChanges { pub(crate) fn column_mapping_mode(&self) -> &ColumnMappingMode { &self.end_snapshot.column_mapping_mode } + + /// Create a [`TableChangesScanBuilder`] for an `Arc`. + pub fn scan_builder(self: Arc) -> TableChangesScanBuilder { + TableChangesScanBuilder::new(self) + } + + /// Consume this `TableChanges` to create a [`TableChangesScanBuilder`] + pub fn into_scan_builder(self) -> TableChangesScanBuilder { + TableChangesScanBuilder::new(self) + } } #[cfg(test)] mod tests { use crate::engine::sync::SyncEngine; -<<<<<<< HEAD use crate::schema::{DataType, StructField}; use crate::table_changes::CDF_FIELDS; use crate::{Error, Table}; -======= - use crate::schema::DataType; - use crate::schema::StructField; - use crate::table_changes::CDF_FIELDS; - use crate::Error; - use crate::Table; ->>>>>>> 073ae49 (Add table changes schema) use itertools::assert_equal; #[test] diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs new file mode 100644 index 0000000000..7dfa79fb26 --- /dev/null +++ b/kernel/src/table_changes/scan.rs @@ -0,0 +1,204 @@ +use std::sync::Arc; + +use itertools::Itertools; +use tracing::debug; + +use crate::scan::ColumnType; +use crate::schema::SchemaRef; +use crate::{DeltaResult, ExpressionRef}; + +use super::{TableChanges, CDF_FIELDS}; + +/// The result of building a [`TableChanges`] scan over a table. This can be used to get a change +/// data feed from the table +#[allow(unused)] +#[derive(Debug)] +pub struct TableChangesScan { + table_changes: Arc, + logical_schema: SchemaRef, + predicate: Option, + all_fields: Vec, + have_partition_cols: bool, +} + +/// Builder to read the `TableChanges` of a table. +pub struct TableChangesScanBuilder { + table_changes: Arc, + schema: Option, + predicate: Option, +} + +impl TableChangesScanBuilder { + /// Create a new [`TableChangesScanBuilder`] instance. + pub fn new(table_changes: impl Into>) -> Self { + Self { + table_changes: table_changes.into(), + schema: None, + predicate: None, + } + } + + /// Provide [`Schema`] for columns to select from the [`TableChanges`]. + /// + /// A table with columns `[a, b, c]` could have a scan which reads only the first + /// two columns by using the schema `[a, b]`. + /// + /// [`Schema`]: crate::schema::Schema + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } + + /// Optionally provide a [`SchemaRef`] for columns to select from the [`TableChanges`]. See + /// [`TableChangesScanBuilder::with_schema`] for details. If `schema_opt` is `None` this is a no-op. + pub fn with_schema_opt(self, schema_opt: Option) -> Self { + match schema_opt { + Some(schema) => self.with_schema(schema), + None => self, + } + } + + /// Optionally provide an expression to filter rows. For example, using the predicate `x < + /// 4` to return a subset of the rows in the scan which satisfy the filter. If `predicate_opt` + /// is `None`, this is a no-op. + /// + /// NOTE: The filtering is best-effort and can produce false positives (rows that should should + /// have been filtered out but were kept). + pub fn with_predicate(mut self, predicate: impl Into>) -> Self { + self.predicate = predicate.into(); + self + } + + /// Build the [`TableChangesScan`]. + /// + /// This does not scan the table at this point, but does do some work to ensure that the + /// provided schema make sense, and to prepare some metadata that the scan will need. The + /// [`TableChangesScan`] type itself can be used to fetch the files and associated metadata required to + /// perform actual data reads. + pub fn build(self) -> DeltaResult { + // if no schema is provided, use snapshot's entire schema (e.g. SELECT *) + let logical_schema = self + .schema + .unwrap_or(self.table_changes.schema.clone().into()); + let mut have_partition_cols = false; + let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); + + // Loop over all selected fields and note if they are columns that will be read from the + // parquet file ([`ColumnType::Selected`]) or if they are partition columns and will need to + // be filled in by evaluating an expression ([`ColumnType::Partition`]) + println!("Logical schema: {:?}", logical_schema); + let column_types = logical_schema + .fields() + .enumerate() + .map(|(index, logical_field)| -> DeltaResult<_> { + if self + .table_changes + .partition_columns() + .contains(logical_field.name()) + { + // Store the index into the schema for this field. When we turn it into an + // expression in the inner loop, we will index into the schema and get the name and + // data type, which we need to properly materialize the column. + have_partition_cols = true; + Ok(ColumnType::Partition(index)) + } else if CDF_FIELDS + .iter() + .any(|field| field.name() == logical_field.name()) + { + // CDF Columns are generated, so they do not have a column mapping. + Ok(ColumnType::Selected(logical_field.name().to_string())) + } else { + // Add to read schema, store field so we can build a `Column` expression later + // if needed (i.e. if we have partition columns) + let physical_field = + logical_field.make_physical(*self.table_changes.column_mapping_mode())?; + debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n"); + let physical_name = physical_field.name.clone(); + read_fields.push(physical_field); + Ok(ColumnType::Selected(physical_name)) + } + }) + .try_collect()?; + Ok(TableChangesScan { + table_changes: self.table_changes, + logical_schema, + predicate: self.predicate, + all_fields: column_types, + have_partition_cols, + }) + } +} +#[cfg(test)] +mod tests { + + use std::sync::Arc; + + use crate::engine::sync::SyncEngine; + use crate::expressions::{column_expr, Scalar}; + use crate::scan::ColumnType; + use crate::schema::{DataType, StructField, StructType}; + use crate::{Expression, Table}; + + #[test] + fn simple_table_changes_scan_builder() { + let path = "./tests/data/table-with-cdf"; + let engine = Box::new(SyncEngine::new()); + let table = Table::try_from_uri(path).unwrap(); + + // A field in the schema goes from being nullable to non-nullable + let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap(); + + let scan = table_changes.into_scan_builder().build().unwrap(); + assert_eq!( + scan.all_fields, + vec![ + ColumnType::Selected("part".to_string()), + ColumnType::Selected("id".to_string()), + ColumnType::Selected("_change_type".to_string()), + ColumnType::Selected("_commit_version".to_string()), + ColumnType::Selected("_commit_timestamp".to_string()), + ] + ); + assert_eq!(scan.predicate, None); + assert!(!scan.have_partition_cols); + } + + #[test] + fn projected_and_filtered_table_changes_scan_builder() { + let path = "./tests/data/table-with-cdf"; + let engine = Box::new(SyncEngine::new()); + let table = Table::try_from_uri(path).unwrap(); + + // A field in the schema goes from being nullable to non-nullable + let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap(); + + let schema = table_changes + .schema() + .project(&["id", "_commit_version"]) + .unwrap(); + let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10))); + let scan = table_changes + .into_scan_builder() + .with_schema(schema) + .with_predicate(predicate.clone()) + .build() + .unwrap(); + assert_eq!( + scan.all_fields, + vec![ + ColumnType::Selected("id".to_string()), + ColumnType::Selected("_commit_version".to_string()), + ] + ); + assert_eq!( + scan.logical_schema, + StructType::new([ + StructField::new("id", DataType::INTEGER, true), + StructField::new("_commit_version", DataType::LONG, false), + ]) + .into() + ); + assert!(!scan.have_partition_cols); + assert_eq!(scan.predicate, Some(predicate)); + } +} From 58e7cfd35d7d76b50cf7987828583a83a074a5cc Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 14:47:20 -0800 Subject: [PATCH 07/14] table_changes --- kernel/src/table_changes/scan.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 7dfa79fb26..b579718f87 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -13,6 +13,7 @@ use super::{TableChanges, CDF_FIELDS}; /// data feed from the table #[allow(unused)] #[derive(Debug)] +#[allow(unused)] pub struct TableChangesScan { table_changes: Arc, logical_schema: SchemaRef, From da8e8b0669b2c62f7ab8c68d01a9305ab0fee679 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 15:16:53 -0800 Subject: [PATCH 08/14] Remove derives --- kernel/src/scan/mod.rs | 2 +- kernel/src/table_changes/scan.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index ef91b054c2..6d502251a8 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -160,7 +160,7 @@ impl ScanResult { /// `Partition` we store an index into the logical schema for this query since later we need the /// data type as well to materialize the partition column. #[allow(unused)] -#[derive(Debug, PartialEq, Eq)] +#[derive(PartialEq, Debug)] pub enum ColumnType { // A column, selected from the data, as is Selected(String), diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index b579718f87..7dfa79fb26 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -13,7 +13,6 @@ use super::{TableChanges, CDF_FIELDS}; /// data feed from the table #[allow(unused)] #[derive(Debug)] -#[allow(unused)] pub struct TableChangesScan { table_changes: Arc, logical_schema: SchemaRef, From 030c64ace28b11161a7280844b37516558bd4faf Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 20:30:21 -0800 Subject: [PATCH 09/14] Add docs --- kernel/src/table_changes/scan.rs | 49 ++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 7dfa79fb26..cbaf17e657 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -21,7 +21,42 @@ pub struct TableChangesScan { have_partition_cols: bool, } -/// Builder to read the `TableChanges` of a table. +/// This builder constructs a [`TableChangesScan`] that can be used to read the [`TableChanges`] +/// of a table. [`TableChangesScanBuilder`] allows you to specify a schema to project the columns +/// or specify a predicate to filter rows in the Change Data Feed. Note that predicates over Change +/// Data Feed columns `_change_type`, `_commit_version`, and `_commit_timestamp` are not currently +/// allowed. See issue [#525](https://github.com/delta-io/delta-kernel-rs/issues/525). +/// +/// #Examples +/// Construct a [`TableChangesScan`] from `table_changes` with a given schema and predicate +/// ```rust +/// # use std::sync::Arc; +/// # use delta_kernel::engine::sync::SyncEngine; +/// # use delta_kernel::expressions::{column_expr, Scalar}; +/// # use delta_kernel::scan::ColumnType; +/// # use delta_kernel::schema::{DataType, StructField, StructType}; +/// # use delta_kernel::{Expression, Table}; +/// # let path = "./tests/data/table-with-cdf"; +/// # let engine = Box::new(SyncEngine::new()); +/// # let table = Table::try_from_uri(path).unwrap(); +/// # let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap(); +/// let schema = table_changes +/// .schema() +/// .project(&["id", "_commit_version"]) +/// .unwrap(); +/// let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10))); +/// let scan = table_changes +/// .into_scan_builder() +/// .with_schema(schema) +/// .with_predicate(predicate.clone()) +/// .build(); +/// ``` +/// +/// Note: There is a lot of shared functionality between [`TableChangesScanBuilder`] and +/// [`ScanBuilder`]. +/// +/// [`ScanBuilder`]: crate::scan::ScanBuilder +#[derive(Debug)] pub struct TableChangesScanBuilder { table_changes: Arc, schema: Option, @@ -51,11 +86,9 @@ impl TableChangesScanBuilder { /// Optionally provide a [`SchemaRef`] for columns to select from the [`TableChanges`]. See /// [`TableChangesScanBuilder::with_schema`] for details. If `schema_opt` is `None` this is a no-op. - pub fn with_schema_opt(self, schema_opt: Option) -> Self { - match schema_opt { - Some(schema) => self.with_schema(schema), - None => self, - } + pub fn with_schema_opt(mut self, schema_opt: Option) -> Self { + self.schema = schema_opt; + self } /// Optionally provide an expression to filter rows. For example, using the predicate `x < @@ -76,10 +109,10 @@ impl TableChangesScanBuilder { /// [`TableChangesScan`] type itself can be used to fetch the files and associated metadata required to /// perform actual data reads. pub fn build(self) -> DeltaResult { - // if no schema is provided, use snapshot's entire schema (e.g. SELECT *) + // if no schema is provided, use `TableChanges`'s entire (logical) schema (e.g. SELECT *) let logical_schema = self .schema - .unwrap_or(self.table_changes.schema.clone().into()); + .unwrap_or_else(|| self.table_changes.schema.clone().into()); let mut have_partition_cols = false; let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); From 3a38232404d03df52aacc2cb41f8b6701157e53a Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 20:42:48 -0800 Subject: [PATCH 10/14] Add comments for all_fileds --- kernel/src/table_changes/scan.rs | 33 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index cbaf17e657..e3d8107c2d 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -33,8 +33,6 @@ pub struct TableChangesScan { /// # use std::sync::Arc; /// # use delta_kernel::engine::sync::SyncEngine; /// # use delta_kernel::expressions::{column_expr, Scalar}; -/// # use delta_kernel::scan::ColumnType; -/// # use delta_kernel::schema::{DataType, StructField, StructType}; /// # use delta_kernel::{Expression, Table}; /// # let path = "./tests/data/table-with-cdf"; /// # let engine = Box::new(SyncEngine::new()); @@ -79,15 +77,8 @@ impl TableChangesScanBuilder { /// two columns by using the schema `[a, b]`. /// /// [`Schema`]: crate::schema::Schema - pub fn with_schema(mut self, schema: SchemaRef) -> Self { - self.schema = Some(schema); - self - } - - /// Optionally provide a [`SchemaRef`] for columns to select from the [`TableChanges`]. See - /// [`TableChangesScanBuilder::with_schema`] for details. If `schema_opt` is `None` this is a no-op. - pub fn with_schema_opt(mut self, schema_opt: Option) -> Self { - self.schema = schema_opt; + pub fn with_schema(mut self, schema: impl Into>) -> Self { + self.schema = schema.into(); self } @@ -116,11 +107,15 @@ impl TableChangesScanBuilder { let mut have_partition_cols = false; let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); - // Loop over all selected fields and note if they are columns that will be read from the - // parquet file ([`ColumnType::Selected`]) or if they are partition columns and will need to - // be filled in by evaluating an expression ([`ColumnType::Partition`]) - println!("Logical schema: {:?}", logical_schema); - let column_types = logical_schema + // Loop over all selected fields. We produce the following: + // - If the field is read from the parquet file then it is ([`ColumnType::Selected`]). + // - If the field is a column generated by CDF, it is also ([`ColumnType::Selected`]). + // These fields will be handled separately from the other ([`ColumnType::Selected`]). + // - If the field is a partition column, it is ([`ColumnType::Partition`]). + // + // Both the partition columns and CDF generated columns will be filled in by evaluating an + // expression when transforming physical data to the logical representation. + let all_fields = logical_schema .fields() .enumerate() .map(|(index, logical_field)| -> DeltaResult<_> { @@ -138,7 +133,9 @@ impl TableChangesScanBuilder { .iter() .any(|field| field.name() == logical_field.name()) { - // CDF Columns are generated, so they do not have a column mapping. + // CDF Columns are generated, so they do not have a column mapping. These will + // be processed separately and used to build an expression when transforming physical + // data to logical. Ok(ColumnType::Selected(logical_field.name().to_string())) } else { // Add to read schema, store field so we can build a `Column` expression later @@ -156,7 +153,7 @@ impl TableChangesScanBuilder { table_changes: self.table_changes, logical_schema, predicate: self.predicate, - all_fields: column_types, + all_fields, have_partition_cols, }) } From 113b267b1f71b9ee95032bd7807162985d9ac890 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 20:49:10 -0800 Subject: [PATCH 11/14] Address pr comments --- kernel/src/scan/mod.rs | 1 - kernel/src/table_changes/scan.rs | 9 ++++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 6d502251a8..c4c5873ef1 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -159,7 +159,6 @@ impl ScanResult { /// store the name of the column, as that's all that's needed during the actual query. For /// `Partition` we store an index into the logical schema for this query since later we need the /// data type as well to materialize the partition column. -#[allow(unused)] #[derive(PartialEq, Debug)] pub enum ColumnType { // A column, selected from the data, as is diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index e3d8107c2d..b79ad8e029 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -27,6 +27,10 @@ pub struct TableChangesScan { /// Data Feed columns `_change_type`, `_commit_version`, and `_commit_timestamp` are not currently /// allowed. See issue [#525](https://github.com/delta-io/delta-kernel-rs/issues/525). /// +/// Note: There is a lot of shared functionality between [`TableChangesScanBuilder`] and +/// [`ScanBuilder`]. +/// +/// [`ScanBuilder`]: crate::scan::ScanBuilder /// #Examples /// Construct a [`TableChangesScan`] from `table_changes` with a given schema and predicate /// ```rust @@ -49,11 +53,6 @@ pub struct TableChangesScan { /// .with_predicate(predicate.clone()) /// .build(); /// ``` -/// -/// Note: There is a lot of shared functionality between [`TableChangesScanBuilder`] and -/// [`ScanBuilder`]. -/// -/// [`ScanBuilder`]: crate::scan::ScanBuilder #[derive(Debug)] pub struct TableChangesScanBuilder { table_changes: Arc, From 2c538527f362469587c042ad7c9ac37c740d3fc2 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Fri, 22 Nov 2024 13:55:28 -0800 Subject: [PATCH 12/14] Remove duplicate line from rebase --- kernel/src/table_changes/mod.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index a995ca6db2..c47ca61dab 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -112,12 +112,6 @@ impl TableChanges { // compatibility for each schema update in the CDF range. // Note: Schema compatibility check will be changed in the future to be more flexible. // See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) - - if start_snapshot.schema() != end_snapshot.schema() { - return Err(Error::generic(format!( - "Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(), - ))); - } if start_snapshot.schema() != end_snapshot.schema() { return Err(Error::generic(format!( "Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(), From f7c00a8db9dad4edef75c36489e9db5ab4b514d0 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Fri, 22 Nov 2024 13:57:15 -0800 Subject: [PATCH 13/14] remove duplicate test --- kernel/src/table_changes/mod.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index c47ca61dab..7f4b8963a9 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -240,20 +240,4 @@ mod tests { let table_changes = table.table_changes(engine.as_ref(), 0, 0).unwrap(); assert_equal(expected_schema, table_changes.schema().fields().cloned()); } - - #[test] - fn test_table_changes_has_cdf_schema() { - let path = "./tests/data/table-with-cdf"; - let engine = Box::new(SyncEngine::new()); - let table = Table::try_from_uri(path).unwrap(); - let expected_schema = [ - StructField::new("part", DataType::INTEGER, true), - StructField::new("id", DataType::INTEGER, true), - ] - .into_iter() - .chain(CDF_FIELDS.clone()); - - let table_changes = table.table_changes(engine.as_ref(), 0, 0).unwrap(); - assert_equal(expected_schema, table_changes.schema().fields().cloned()); - } } From 53732c46a0da484a9c3bda728f939ace98bf28c4 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Fri, 22 Nov 2024 15:39:14 -0800 Subject: [PATCH 14/14] add clarifying comment to test --- kernel/src/table_changes/scan.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index b79ad8e029..34161a96f5 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -178,6 +178,7 @@ mod tests { let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap(); let scan = table_changes.into_scan_builder().build().unwrap(); + // Note that this table is not partitioned. `part` is a regular field assert_eq!( scan.all_fields, vec![