Skip to content
Merged
1 change: 1 addition & 0 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl ScanResult {
/// store the name of the column, as that's all that's needed during the actual query. For
/// `Partition` we store an index into the logical schema for this query since later we need the
/// data type as well to materialize the partition column.
#[derive(PartialEq, Debug)]
pub enum ColumnType {
// A column, selected from the data, as is
Selected(String),
Expand Down
18 changes: 15 additions & 3 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Provides an API to read the table's change data feed between two versions.
use std::sync::LazyLock;
use std::sync::{Arc, LazyLock};

use scan::TableChangesScanBuilder;
use url::Url;

use crate::log_segment::LogSegment;
Expand All @@ -10,6 +11,8 @@ use crate::snapshot::Snapshot;
use crate::table_features::ColumnMappingMode;
use crate::{DeltaResult, Engine, Error, Version};

pub mod scan;

static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
[
StructField::new("_change_type", DataType::STRING, false),
Expand All @@ -19,7 +22,7 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
});

/// Represents a call to read the Change Data Feed (CDF) between two versions of a table. The schema of
/// `TableChanges` will be the schema of the table at the end verios with three additional columns:
/// `TableChanges` will be the schema of the table at the end version with three additional columns:
/// - `_change_type`: String representing the type of change that for that commit. This may be one
/// of `delete`, `insert`, `update_preimage`, or `update_postimage`.
/// - `_commit_version`: Long representing the commit the change occurred in.
Expand Down Expand Up @@ -109,7 +112,6 @@ impl TableChanges {
// compatibility for each schema update in the CDF range.
// Note: Schema compatibility check will be changed in the future to be more flexible.
// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523)

if start_snapshot.schema() != end_snapshot.schema() {
return Err(Error::generic(format!(
"Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(),
Expand Down Expand Up @@ -169,6 +171,16 @@ impl TableChanges {
pub(crate) fn column_mapping_mode(&self) -> &ColumnMappingMode {
&self.end_snapshot.column_mapping_mode
}

/// Create a [`TableChangesScanBuilder`] for an `Arc<TableChanges>`.
pub fn scan_builder(self: Arc<Self>) -> TableChangesScanBuilder {
TableChangesScanBuilder::new(self)
}

/// Consume this `TableChanges` to create a [`TableChangesScanBuilder`]
pub fn into_scan_builder(self) -> TableChangesScanBuilder {
TableChangesScanBuilder::new(self)
}
Comment thread
zachschuermann marked this conversation as resolved.
}

#[cfg(test)]
Expand Down
234 changes: 234 additions & 0 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
use std::sync::Arc;

use itertools::Itertools;
use tracing::debug;

use crate::scan::ColumnType;
use crate::schema::SchemaRef;
use crate::{DeltaResult, ExpressionRef};

use super::{TableChanges, CDF_FIELDS};

/// The result of building a [`TableChanges`] scan over a table. This can be used to get a change
/// data feed from the table
#[allow(unused)]
Comment thread
zachschuermann marked this conversation as resolved.
#[derive(Debug)]
pub struct TableChangesScan {
table_changes: Arc<TableChanges>,
logical_schema: SchemaRef,
predicate: Option<ExpressionRef>,
all_fields: Vec<ColumnType>,
have_partition_cols: bool,
}

/// This builder constructs a [`TableChangesScan`] that can be used to read the [`TableChanges`]
/// of a table. [`TableChangesScanBuilder`] allows you to specify a schema to project the columns
/// or specify a predicate to filter rows in the Change Data Feed. Note that predicates over Change
/// Data Feed columns `_change_type`, `_commit_version`, and `_commit_timestamp` are not currently
/// allowed. See issue [#525](https://github.com/delta-io/delta-kernel-rs/issues/525).
///
/// Note: There is a lot of shared functionality between [`TableChangesScanBuilder`] and
/// [`ScanBuilder`].
///
/// [`ScanBuilder`]: crate::scan::ScanBuilder
/// #Examples
/// Construct a [`TableChangesScan`] from `table_changes` with a given schema and predicate
/// ```rust
/// # use std::sync::Arc;
/// # use delta_kernel::engine::sync::SyncEngine;
/// # use delta_kernel::expressions::{column_expr, Scalar};
/// # use delta_kernel::{Expression, Table};
/// # let path = "./tests/data/table-with-cdf";
/// # let engine = Box::new(SyncEngine::new());
/// # let table = Table::try_from_uri(path).unwrap();
/// # let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap();
/// let schema = table_changes
/// .schema()
/// .project(&["id", "_commit_version"])
/// .unwrap();
/// let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10)));
/// let scan = table_changes
/// .into_scan_builder()
/// .with_schema(schema)
/// .with_predicate(predicate.clone())
/// .build();
/// ```
#[derive(Debug)]
pub struct TableChangesScanBuilder {
Comment thread
zachschuermann marked this conversation as resolved.
table_changes: Arc<TableChanges>,
schema: Option<SchemaRef>,
predicate: Option<ExpressionRef>,
}

impl TableChangesScanBuilder {
/// Create a new [`TableChangesScanBuilder`] instance.
pub fn new(table_changes: impl Into<Arc<TableChanges>>) -> Self {
Self {
table_changes: table_changes.into(),
schema: None,
predicate: None,
Comment thread
zachschuermann marked this conversation as resolved.
}
}

/// Provide [`Schema`] for columns to select from the [`TableChanges`].
///
/// A table with columns `[a, b, c]` could have a scan which reads only the first
/// two columns by using the schema `[a, b]`.
///
/// [`Schema`]: crate::schema::Schema
pub fn with_schema(mut self, schema: impl Into<Option<SchemaRef>>) -> Self {
self.schema = schema.into();
self
}

/// Optionally provide an expression to filter rows. For example, using the predicate `x <
/// 4` to return a subset of the rows in the scan which satisfy the filter. If `predicate_opt`
/// is `None`, this is a no-op.
///
/// NOTE: The filtering is best-effort and can produce false positives (rows that should should
/// have been filtered out but were kept).
pub fn with_predicate(mut self, predicate: impl Into<Option<ExpressionRef>>) -> Self {
self.predicate = predicate.into();
self
}

/// Build the [`TableChangesScan`].
///
/// This does not scan the table at this point, but does do some work to ensure that the
/// provided schema make sense, and to prepare some metadata that the scan will need. The
/// [`TableChangesScan`] type itself can be used to fetch the files and associated metadata required to
/// perform actual data reads.
pub fn build(self) -> DeltaResult<TableChangesScan> {
// if no schema is provided, use `TableChanges`'s entire (logical) schema (e.g. SELECT *)
let logical_schema = self
.schema
.unwrap_or_else(|| self.table_changes.schema.clone().into());
let mut have_partition_cols = false;
let mut read_fields = Vec::with_capacity(logical_schema.fields.len());

// Loop over all selected fields. We produce the following:
// - If the field is read from the parquet file then it is ([`ColumnType::Selected`]).
// - If the field is a column generated by CDF, it is also ([`ColumnType::Selected`]).
// These fields will be handled separately from the other ([`ColumnType::Selected`]).
// - If the field is a partition column, it is ([`ColumnType::Partition`]).
//
// Both the partition columns and CDF generated columns will be filled in by evaluating an
// expression when transforming physical data to the logical representation.
let all_fields = logical_schema
.fields()
.enumerate()
.map(|(index, logical_field)| -> DeltaResult<_> {
if self
.table_changes
.partition_columns()
.contains(logical_field.name())
{
// Store the index into the schema for this field. When we turn it into an
// expression in the inner loop, we will index into the schema and get the name and
// data type, which we need to properly materialize the column.
have_partition_cols = true;
Ok(ColumnType::Partition(index))
} else if CDF_FIELDS
.iter()
.any(|field| field.name() == logical_field.name())
{
// CDF Columns are generated, so they do not have a column mapping. These will
// be processed separately and used to build an expression when transforming physical
// data to logical.
Ok(ColumnType::Selected(logical_field.name().to_string()))
} else {
// Add to read schema, store field so we can build a `Column` expression later
// if needed (i.e. if we have partition columns)
let physical_field =
logical_field.make_physical(*self.table_changes.column_mapping_mode())?;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this changes a lot to support column mapping and/or nested columns: #512

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm how do you think we should handle this?

@scovich scovich Nov 22, 2024

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an area where CDF is not different from a normal scan. We just need to leverage the same approach the other PR introduces. IMO we should NOT solve it here -- just let the other PR pick it up.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sweet, ty!

debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
let physical_name = physical_field.name.clone();
read_fields.push(physical_field);
Ok(ColumnType::Selected(physical_name))
}
})
.try_collect()?;
Ok(TableChangesScan {
table_changes: self.table_changes,
logical_schema,
predicate: self.predicate,
all_fields,
have_partition_cols,
})
}
}
#[cfg(test)]
mod tests {

use std::sync::Arc;

use crate::engine::sync::SyncEngine;
use crate::expressions::{column_expr, Scalar};
use crate::scan::ColumnType;
use crate::schema::{DataType, StructField, StructType};
use crate::{Expression, Table};

#[test]
fn simple_table_changes_scan_builder() {
let path = "./tests/data/table-with-cdf";

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: little bit confusing there's a column named 'part' but there are no partition columns. maybe just add a comment that this is a non-partitioned table?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added 👍

let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();

// A field in the schema goes from being nullable to non-nullable
let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap();

let scan = table_changes.into_scan_builder().build().unwrap();
// Note that this table is not partitioned. `part` is a regular field
assert_eq!(
scan.all_fields,
vec![
ColumnType::Selected("part".to_string()),
ColumnType::Selected("id".to_string()),
ColumnType::Selected("_change_type".to_string()),
ColumnType::Selected("_commit_version".to_string()),
ColumnType::Selected("_commit_timestamp".to_string()),
Comment on lines +187 to +189

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess these generated columns are not file-constant values, so we have to treat them like normal columns even tho they don't come from the parquet?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I considered introducing a ColumnType::Generated. I feel like this is the "right" way, but may take some more discussion and changes to kernel/scan/mod.rs. If you think ColumnType::Generated is not controversial, we can go with that.

The current plan is to treat them as selected, and later check when transforming physical to logical:

match (column_type) {
    ColumnType::Selected(col) => {
       if CDF_FIELDS.contains(col) {
           // Treat as CDF generated column
       } else {
           // Usual path for ColumnType::Selected
       }
   }
   ...
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nicklan @zachschuermann If you feel strongly that we should do Generated or keep Selected, let me know!

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it as-is for now. Once we add support for row tracking there will be more generated columns to worry about and we can revisit with more context available.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, and I think the expression stuff will change this as well. Rather than a "selected" field, we'll just generate an expression to say, add a column with this value.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure SGTM :)

]
);
assert_eq!(scan.predicate, None);
assert!(!scan.have_partition_cols);
}

#[test]
fn projected_and_filtered_table_changes_scan_builder() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();

// A field in the schema goes from being nullable to non-nullable
let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap();

let schema = table_changes
.schema()
.project(&["id", "_commit_version"])
.unwrap();
let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10)));
let scan = table_changes
.into_scan_builder()
.with_schema(schema)
.with_predicate(predicate.clone())
.build()
.unwrap();
assert_eq!(
scan.all_fields,
vec![
ColumnType::Selected("id".to_string()),
ColumnType::Selected("_commit_version".to_string()),
]
);
assert_eq!(
scan.logical_schema,
StructType::new([
StructField::new("id", DataType::INTEGER, true),
StructField::new("_commit_version", DataType::LONG, false),
])
.into()
);
assert!(!scan.have_partition_cols);
assert_eq!(scan.predicate, Some(predicate));
}
}