Implement Builder for Scans on TableChanges #521
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #521 +/- ##
==========================================
+ Coverage 80.43% 80.58% +0.15%
==========================================
Files 62 63 +1
Lines 13645 13762 +117
Branches 13645 13762 +117
==========================================
+ Hits 10975 11090 +115
+ Misses 2114 2113 -1
- Partials 556 559 +3 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
zachschuermann
left a comment
There was a problem hiding this comment.
quick pass with some early comments
| Option::<Protocol>::get_struct_field(PROTOCOL_NAME), | ||
| Option::<SetTransaction>::get_struct_field(SET_TRANSACTION_NAME), | ||
| Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME), | ||
| Option::<Cdc>::get_struct_field(CDC_NAME), |
There was a problem hiding this comment.
Does this PR need to rebase? I could have sworn Cdc infra already merged?
There was a problem hiding this comment.
Yep, needs a rebase
TableChangs TableChanges
9ac7550 to
2e1fd66
Compare
| match schema_opt { | ||
| Some(schema) => self.with_schema(schema), | ||
| None => self, | ||
| } |
There was a problem hiding this comment.
does this also work?
| match schema_opt { | |
| Some(schema) => self.with_schema(schema), | |
| None => self, | |
| } | |
| schema_opt.map_or(self, |schema| self.with_schema(schema)) |
There was a problem hiding this comment.
Ah it doesn't because it sees self is moving ownership to map_or's default and also to the closure. Not allowed 😔
There was a problem hiding this comment.
map_or_else might delay the move enough for it to work?
schema_opt.map_or_else(|| self, |schema| self.with_schema(schema))| // Add to read schema, store field so we can build a `Column` expression later | ||
| // if needed (i.e. if we have partition columns) | ||
| let physical_field = | ||
| logical_field.make_physical(*self.table_changes.column_mapping_mode())?; |
There was a problem hiding this comment.
FYI this changes a lot to support column mapping and/or nested columns: #512
There was a problem hiding this comment.
Hmm how do you think we should handle this?
There was a problem hiding this comment.
This is an area where CDF is not different from a normal scan. We just need to leverage the same approach the other PR introduces. IMO we should NOT solve it here -- just let the other PR pick it up.
There was a problem hiding this comment.
sweet, ty!
| ColumnType::Selected("_change_type".to_string()), | ||
| ColumnType::Selected("_commit_version".to_string()), | ||
| ColumnType::Selected("_commit_timestamp".to_string()), |
There was a problem hiding this comment.
I guess these generated columns are not file-constant values, so we have to treat them like normal columns even tho they don't come from the parquet?
There was a problem hiding this comment.
Correct. I considered introducing a ColumnType::Generated. I feel like this is the "right" way, but may take some more discussion and changes to kernel/scan/mod.rs. If you think ColumnType::Generated is not controversial, we can go with that.
The current plan is to treat them as selected, and later check when transforming physical to logical:
match (column_type) {
ColumnType::Selected(col) => {
if CDF_FIELDS.contains(col) {
// Treat as CDF generated column
} else {
// Usual path for ColumnType::Selected
}
}
...
}
There was a problem hiding this comment.
@nicklan @zachschuermann If you feel strongly that we should do Generated or keep Selected, let me know!
There was a problem hiding this comment.
Let's keep it as-is for now. Once we add support for row tracking there will be more generated columns to worry about and we can revisit with more context available.
There was a problem hiding this comment.
Yeah, and I think the expression stuff will change this as well. Rather than a "selected" field, we'll just generate an expression to say, add a column with this value.
2e1fd66 to
a4f36d2
Compare
2e20fab to
2c53852
Compare
| ColumnType::Selected("_change_type".to_string()), | ||
| ColumnType::Selected("_commit_version".to_string()), | ||
| ColumnType::Selected("_commit_timestamp".to_string()), |
There was a problem hiding this comment.
Yeah, and I think the expression stuff will change this as well. Rather than a "selected" field, we'll just generate an expression to say, add a column with this value.
zachschuermann
left a comment
There was a problem hiding this comment.
LGTM just a couple nits!
| ColumnType::Selected("_change_type".to_string()), | ||
| ColumnType::Selected("_commit_version".to_string()), | ||
| ColumnType::Selected("_commit_timestamp".to_string()), |
|
|
||
| #[test] | ||
| fn simple_table_changes_scan_builder() { | ||
| let path = "./tests/data/table-with-cdf"; |
There was a problem hiding this comment.
nit: little bit confusing there's a column named 'part' but there are no partition columns. maybe just add a comment that this is a non-partitioned table?
What changes are proposed in this pull request?
This PR introduces the
TableChangesScanBuilderwhich constructs aTableChangesScangiven aTableChanges, and optionally, a predicate and schema.This introduces the following structs:
TableChangesScanTableChangesScanBuilderI also introduce methods to
TableChangesto get a builder:into_scan_builderscan_builderHow was this change tested?
I ensure that schema projection works for CDF's generated columns, predicates are correctly processed, and that the ColumnTypes are correctly created for the
TableChangesScan